Browse Source

Merge branch 'track-api' into webrtc_media

Paul-Louis Ageneau 5 years ago
parent
commit
bc27393e62

+ 4 - 1
CMakeLists.txt

@@ -49,11 +49,13 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/icetransport.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/init.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/log.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/message.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/peerconnection.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
-	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/track.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
 )
 
@@ -83,6 +85,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
 )
 

+ 4 - 4
README.md

@@ -118,12 +118,12 @@ config.iceServers.emplace_back("mystunserver.org:3478");
 
 auto pc = make_shared<rtc::PeerConnection>(config);
 
-pc->onLocalDescription([](const rtc::Description &sdp) {
+pc->onLocalDescription([](rtc::Description sdp) {
     // Send the SDP to the remote peer
     MY_SEND_DESCRIPTION_TO_REMOTE(string(sdp));
 });
 
-pc->onLocalCandidate([](const rtc::Candidate &candidate) {
+pc->onLocalCandidate([](rtc::Candidate candidate) {
     // Send the candidate to the remote peer
     MY_SEND_CANDIDATE_TO_REMOTE(candidate.candidate(), candidate.mid());
 });
@@ -159,7 +159,7 @@ dc->onOpen([]() {
     cout << "Open" << endl;
 });
 
-dc->onMessage([](const variant<binary, string> &message) {
+dc->onMessage([](variant<binary, string> message) {
     if (holds_alternative<string>(message)) {
         cout << "Received: " << get<string>(message) << endl;
     }
@@ -186,7 +186,7 @@ ws->onOpen([]() {
 	cout << "WebSocket open" << endl;
 });
 
-ws->onMessage([](const variant<binary, string> &message) {
+ws->onMessage([](variant<binary, string> message) {
     if (holds_alternative<string>(message)) {
         cout << "WebSocket received: " << get<string>(message) << endl;
     }

+ 1 - 1
deps/libjuice

@@ -1 +1 @@
-Subproject commit 0234e89d83d121103aaa8d2cf7183dc0f580d86c
+Subproject commit 9be39ad50bcad5900c26fe1f4a4f7f1de621d040

+ 3 - 3
examples/client/main.cpp

@@ -65,7 +65,7 @@ int main(int argc, char **argv) {
 
 	ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
 
-	ws->onMessage([&](const variant<binary, string> &data) {
+	ws->onMessage([&](variant<binary, string> data) {
 		if (!holds_alternative<string>(data))
 			return;
 
@@ -166,7 +166,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 	pc->onGatheringStateChange(
 	    [](PeerConnection::GatheringState state) { cout << "Gathering State: " << state << endl; });
 
-	pc->onLocalDescription([wws, id](const Description &description) {
+	pc->onLocalDescription([wws, id](Description description) {
 		json message = {
 		    {"id", id}, {"type", description.typeString()}, {"description", string(description)}};
 
@@ -174,7 +174,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 			ws->send(message.dump());
 	});
 
-	pc->onLocalCandidate([wws, id](const Candidate &candidate) {
+	pc->onLocalCandidate([wws, id](Candidate candidate) {
 		json message = {{"id", id},
 		                {"type", "candidate"},
 		                {"candidate", string(candidate)},

+ 3 - 3
examples/copy-paste/answerer.cpp

@@ -36,12 +36,12 @@ int main(int argc, char **argv) {
 
 	auto pc = std::make_shared<PeerConnection>(config);
 
-	pc->onLocalDescription([](const Description &description) {
+	pc->onLocalDescription([](Description description) {
 		cout << "Local Description (Paste this to the other peer):" << endl;
 		cout << string(description) << endl;
 	});
 
-	pc->onLocalCandidate([](const Candidate &candidate) {
+	pc->onLocalCandidate([](Candidate candidate) {
 		cout << "Local Candidate (Paste this to the other peer after the local description):"
 		     << endl;
 		cout << string(candidate) << endl << endl;
@@ -60,7 +60,7 @@ int main(int argc, char **argv) {
 
 		dc->onClosed([&]() { cout << "[DataChannel closed: " << dc->label() << "]" << endl; });
 
-		dc->onMessage([](const variant<binary, string> &message) {
+		dc->onMessage([](variant<binary, string> message) {
 			if (holds_alternative<string>(message)) {
 				cout << "[Received message: " << get<string>(message) << "]" << endl;
 			}

+ 3 - 3
examples/copy-paste/offerer.cpp

@@ -36,12 +36,12 @@ int main(int argc, char **argv) {
 
 	auto pc = std::make_shared<PeerConnection>(config);
 
-	pc->onLocalDescription([](const Description &description) {
+	pc->onLocalDescription([](Description description) {
 		cout << "Local Description (Paste this to the other peer):" << endl;
 		cout << string(description) << endl;
 	});
 
-	pc->onLocalCandidate([](const Candidate &candidate) {
+	pc->onLocalCandidate([](Candidate candidate) {
 		cout << "Local Candidate (Paste this to the other peer after the local description):"
 		     << endl;
 		cout << string(candidate) << endl << endl;
@@ -60,7 +60,7 @@ int main(int argc, char **argv) {
 
 	dc->onClosed([&]() { cout << "[DataChannel closed: " << dc->label() << "]" << endl; });
 
-	dc->onMessage([](const variant<binary, string> &message) {
+	dc->onMessage([](variant<binary, string> message) {
 		if (holds_alternative<string>(message)) {
 			cout << "[Received: " << get<string>(message) << "]" << endl;
 		}

+ 10 - 9
include/rtc/channel.hpp

@@ -20,6 +20,7 @@
 #define RTC_CHANNEL_H
 
 #include "include.hpp"
+#include "message.hpp"
 
 #include <atomic>
 #include <functional>
@@ -33,7 +34,7 @@ public:
 	virtual ~Channel() = default;
 
 	virtual void close() = 0;
-	virtual bool send(const std::variant<binary, string> &data) = 0; // returns false if buffered
+	virtual bool send(message_variant data) = 0; // returns false if buffered
 
 	virtual bool isOpen() const = 0;
 	virtual bool isClosed() const = 0;
@@ -42,24 +43,24 @@ public:
 
 	void onOpen(std::function<void()> callback);
 	void onClosed(std::function<void()> callback);
-	void onError(std::function<void(const string &error)> callback);
+	void onError(std::function<void(string error)> callback);
 
-	void onMessage(std::function<void(const std::variant<binary, string> &data)> callback);
-	void onMessage(std::function<void(const binary &data)> binaryCallback,
-	               std::function<void(const string &data)> stringCallback);
+	void onMessage(std::function<void(message_variant data)> callback);
+	void onMessage(std::function<void(binary data)> binaryCallback,
+	               std::function<void(string data)> stringCallback);
 
 	void onBufferedAmountLow(std::function<void()> callback);
 	void setBufferedAmountLowThreshold(size_t amount);
 
 	// Extended API
-	virtual std::optional<std::variant<binary, string>> receive() = 0; // only if onMessage unset
+	virtual std::optional<message_variant> receive() = 0; // only if onMessage unset
 	virtual size_t availableAmount() const; // total size available to receive
 	void onAvailable(std::function<void()> callback);
 
 protected:
 	virtual void triggerOpen();
 	virtual void triggerClosed();
-	virtual void triggerError(const string &error);
+	virtual void triggerError(string error);
 	virtual void triggerAvailable(size_t count);
 	virtual void triggerBufferedAmount(size_t amount);
 
@@ -68,8 +69,8 @@ protected:
 private:
 	synchronized_callback<> mOpenCallback;
 	synchronized_callback<> mClosedCallback;
-	synchronized_callback<const string &> mErrorCallback;
-	synchronized_callback<const std::variant<binary, string> &> mMessageCallback;
+	synchronized_callback<string> mErrorCallback;
+	synchronized_callback<message_variant> mMessageCallback;
 	synchronized_callback<> mAvailableCallback;
 	synchronized_callback<> mBufferedAmountLowCallback;
 

+ 2 - 2
include/rtc/datachannel.hpp

@@ -50,7 +50,7 @@ public:
 	Reliability reliability() const;
 
 	void close(void) override;
-	bool send(const std::variant<binary, string> &data) override;
+	bool send(message_variant data) override;
 	bool send(const byte *data, size_t size);
 	template <typename Buffer> bool sendBuffer(const Buffer &buf);
 	template <typename Iterator> bool sendBuffer(Iterator first, Iterator last);
@@ -61,7 +61,7 @@ public:
 
 	// Extended API
 	size_t availableAmount() const override;
-	std::optional<std::variant<binary, string>> receive() override;
+	std::optional<message_variant> receive() override;
 
 private:
 	void remoteClose();

+ 1 - 1
include/rtc/description.hpp

@@ -139,7 +139,7 @@ private:
 	std::optional<string> mFingerprint;
 
 	Data mData;
-
+  
 	std::map<int, Media> mMedia; // by m-line index
 
 	// Candidates

+ 2 - 0
include/rtc/include.hpp

@@ -64,6 +64,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
 const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536;    // Remote max message size if not specified in SDP
 const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
 
+const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
+
 const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
 
 // overloaded helper

+ 14 - 18
include/rtc/message.hpp

@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2019 Paul-Louis Ageneau
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -24,6 +24,8 @@
 
 #include <functional>
 #include <memory>
+#include <optional>
+#include <variant>
 
 namespace rtc {
 
@@ -46,8 +48,9 @@ struct Message : binary {
 
 using message_ptr = std::shared_ptr<Message>;
 using message_callback = std::function<void(message_ptr message)>;
+using message_variant = std::variant<binary, string>;
 
-constexpr auto message_size_func = [](const message_ptr &m) -> size_t {
+inline size_t message_size_func(const message_ptr &m) {
 	return m->type == Message::Binary || m->type == Message::String ? m->size() : 0;
 };
 
@@ -61,23 +64,16 @@ message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Mess
 	return message;
 }
 
-inline message_ptr make_message(size_t size, Message::Type type = Message::Binary,
-                                unsigned int stream = 0,
-                                std::shared_ptr<Reliability> reliability = nullptr) {
-	auto message = std::make_shared<Message>(size, type);
-	message->stream = stream;
-	message->reliability = reliability;
-	return message;
-}
+message_ptr make_message(size_t size, Message::Type type = Message::Binary, unsigned int stream = 0,
+                         std::shared_ptr<Reliability> reliability = nullptr);
 
-inline message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
-                                unsigned int stream = 0,
-                                std::shared_ptr<Reliability> reliability = nullptr) {
-	auto message = std::make_shared<Message>(std::move(data), type);
-	message->stream = stream;
-	message->reliability = reliability;
-	return message;
-}
+message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
+                         unsigned int stream = 0,
+                         std::shared_ptr<Reliability> reliability = nullptr);
+
+message_ptr make_message(message_variant data);
+
+message_variant to_variant(Message &&message);
 
 } // namespace rtc
 

+ 19 - 17
include/rtc/peerconnection.hpp

@@ -28,6 +28,7 @@
 #include "message.hpp"
 #include "reliability.hpp"
 #include "rtc.hpp"
+#include "track.hpp"
 
 #include <atomic>
 #include <functional>
@@ -85,12 +86,12 @@ public:
 	                          std::optional<Description> mediaDescription = nullopt);
 	void addRemoteCandidate(Candidate candidate);
 
-	std::shared_ptr<DataChannel> createDataChannel(const string &label, const string &protocol = "",
-	                                               const Reliability &reliability = {});
+	std::shared_ptr<DataChannel> createDataChannel(string label, string protocol = "",
+	                                               Reliability reliability = {});
 
 	void onDataChannel(std::function<void(std::shared_ptr<DataChannel> dataChannel)> callback);
-	void onLocalDescription(std::function<void(const Description &description)> callback);
-	void onLocalCandidate(std::function<void(const Candidate &candidate)> callback);
+	void onLocalDescription(std::function<void(Description description)> callback);
+	void onLocalCandidate(std::function<void(Candidate candidate)> callback);
 	void onStateChange(std::function<void(State state)> callback);
 	void onGatheringStateChange(std::function<void(GatheringState state)> callback);
 
@@ -100,13 +101,11 @@ public:
 	size_t bytesReceived();
 	std::optional<std::chrono::milliseconds> rtt();
 
-	// Media
+	// Media support requires compilation with SRTP
 	bool hasMedia() const;
-	void sendMedia(const binary &packet);
-    void sendMedia(const byte *packet, size_t size);
-    void sendMedia(rtc::message_ptr ptr);
 
-	void onMedia(const std::function<void(rtc::message_ptr)>& callback);
+	std::shared_ptr<Track> createTrack(Description::Media description);
+	void onTrack(std::function<void(std::shared_ptr<Track> track)> callback);
 
 	// libnice only
 	bool getSelectedCandidatePair(CandidateInfo *local, CandidateInfo *remote);
@@ -123,18 +122,20 @@ private:
 	void forwardMedia(message_ptr message);
 	void forwardBufferedAmount(uint16_t stream, size_t amount);
 
-	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, const string &label,
-	                                                const string &protocol,
-	                                                const Reliability &reliability);
+	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, string label,
+	                                                string protocol, Reliability reliability);
 	std::shared_ptr<DataChannel> findDataChannel(uint16_t stream);
 	void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
 	void openDataChannels();
 	void closeDataChannels();
 	void remoteCloseDataChannels();
 
+	void openTracks();
+
 	void processLocalDescription(Description description);
 	void processLocalCandidate(Candidate candidate);
 	void triggerDataChannel(std::weak_ptr<DataChannel> weakDataChannel);
+	void triggerTrack(std::weak_ptr<Track> weakTrack);
 	bool changeState(State state);
 	bool changeGatheringState(GatheringState state);
 
@@ -154,18 +155,19 @@ private:
 	std::shared_ptr<DtlsTransport> mDtlsTransport;
 	std::shared_ptr<SctpTransport> mSctpTransport;
 
-	std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels;
-	std::shared_mutex mDataChannelsMutex;
+	std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels; // by stream ID
+	std::unordered_map<string, std::weak_ptr<Track>> mTracks;                   // by mid
+	std::shared_mutex mDataChannelsMutex, mTracksMutex;
 
 	std::atomic<State> mState;
 	std::atomic<GatheringState> mGatheringState;
 
 	synchronized_callback<std::shared_ptr<DataChannel>> mDataChannelCallback;
-	synchronized_callback<const Description &> mLocalDescriptionCallback;
-	synchronized_callback<const Candidate &> mLocalCandidateCallback;
+	synchronized_callback<Description> mLocalDescriptionCallback;
+	synchronized_callback<Candidate> mLocalCandidateCallback;
 	synchronized_callback<State> mStateChangeCallback;
 	synchronized_callback<GatheringState> mGatheringStateChangeCallback;
-	synchronized_callback<rtc::message_ptr> mMediaCallback;
+	synchronized_callback<std::shared_ptr<Track>> mTrackCallback;
 };
 
 } // namespace rtc

+ 6 - 0
include/rtc/queue.hpp

@@ -39,6 +39,7 @@ public:
 
 	void stop();
 	bool empty() const;
+	bool full() const;
 	size_t size() const;   // elements
 	size_t amount() const; // amount
 	void push(T element);
@@ -80,6 +81,11 @@ template <typename T> bool Queue<T>::empty() const {
 	return mQueue.empty();
 }
 
+template <typename T> bool Queue<T>::full() const {
+	std::lock_guard lock(mMutex);
+	return mQueue.size() >= mLimit;
+}
+
 template <typename T> size_t Queue<T>::size() const {
 	std::lock_guard lock(mMutex);
 	return mQueue.size();

+ 77 - 0
include/rtc/track.hpp

@@ -0,0 +1,77 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_TRACK_H
+#define RTC_TRACK_H
+
+#include "channel.hpp"
+#include "description.hpp"
+#include "include.hpp"
+#include "message.hpp"
+#include "queue.hpp"
+
+#include <atomic>
+#include <variant>
+
+namespace rtc {
+
+#if RTC_ENABLE_MEDIA
+class DtlsSrtpTransport;
+#endif
+
+class Track final : public std::enable_shared_from_this<Track>, public Channel {
+public:
+	Track(Description::Media description);
+	~Track() = default;
+
+	string mid() const;
+	Description::Media description() const;
+
+	void close(void) override;
+	bool send(message_variant data) override;
+	bool send(const byte *data, size_t size);
+
+	bool isOpen(void) const override;
+	bool isClosed(void) const override;
+	size_t maxMessageSize() const override;
+
+	// Extended API
+	size_t availableAmount() const override;
+	std::optional<message_variant> receive() override;
+
+private:
+#if RTC_ENABLE_MEDIA
+	void open(std::shared_ptr<DtlsSrtpTransport> transport);
+	std::weak_ptr<DtlsSrtpTransport> mDtlsSrtpTransport;
+#endif
+
+	bool outgoing(message_ptr message);
+	void incoming(message_ptr message);
+
+	Description::Media mMediaDescription;
+	std::atomic<bool> mIsClosed = false;
+
+	Queue<message_ptr> mRecvQueue;
+
+	friend class PeerConnection;
+};
+
+} // namespace rtc
+
+#endif
+

+ 2 - 2
include/rtc/websocket.hpp

@@ -58,14 +58,14 @@ public:
 
 	void open(const string &url);
 	void close() override;
-	bool send(const std::variant<binary, string> &data) override;
+	bool send(const message_variant data) override;
 
 	bool isOpen() const override;
 	bool isClosed() const override;
 	size_t maxMessageSize() const override;
 
 	// Extended API
-	std::optional<std::variant<binary, string>> receive() override;
+	std::optional<message_variant> receive() override;
 	size_t availableAmount() const override; // total size available to receive
 
 private:

+ 7 - 9
src/channel.cpp

@@ -34,11 +34,9 @@ void Channel::onClosed(std::function<void()> callback) {
 	mClosedCallback = callback;
 }
 
-void Channel::onError(std::function<void(const string &error)> callback) {
-	mErrorCallback = callback;
-}
+void Channel::onError(std::function<void(string error)> callback) { mErrorCallback = callback; }
 
-void Channel::onMessage(std::function<void(const std::variant<binary, string> &data)> callback) {
+void Channel::onMessage(std::function<void(message_variant data)> callback) {
 	mMessageCallback = callback;
 
 	// Pass pending messages
@@ -46,10 +44,10 @@ void Channel::onMessage(std::function<void(const std::variant<binary, string> &d
 		mMessageCallback(*message);
 }
 
-void Channel::onMessage(std::function<void(const binary &data)> binaryCallback,
-                        std::function<void(const string &data)> stringCallback) {
-	onMessage([binaryCallback, stringCallback](const std::variant<binary, string> &data) {
-		std::visit(overloaded{binaryCallback, stringCallback}, data);
+void Channel::onMessage(std::function<void(binary data)> binaryCallback,
+                        std::function<void(string data)> stringCallback) {
+	onMessage([binaryCallback, stringCallback](std::variant<binary, string> data) {
+		std::visit(overloaded{binaryCallback, stringCallback}, std::move(data));
 	});
 }
 
@@ -67,7 +65,7 @@ void Channel::triggerOpen() { mOpenCallback(); }
 
 void Channel::triggerClosed() { mClosedCallback(); }
 
-void Channel::triggerError(const string &error) { mErrorCallback(error); }
+void Channel::triggerError(string error) { mErrorCallback(error); }
 
 void Channel::triggerAvailable(size_t count) {
 	if (count == 1)

+ 7 - 27
src/datachannel.cpp

@@ -72,8 +72,6 @@ struct CloseMessage {
 };
 #pragma pack(pop)
 
-const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // 1 MiB
-
 DataChannel::DataChannel(weak_ptr<PeerConnection> pc, unsigned int stream, string label,
                          string protocol, Reliability reliability)
     : mPeerConnection(pc), mStream(stream), mLabel(std::move(label)),
@@ -117,39 +115,21 @@ void DataChannel::remoteClose() {
 	mSctpTransport.reset();
 }
 
-bool DataChannel::send(const std::variant<binary, string> &data) {
-	return std::visit(
-	    [&](const auto &d) {
-		    using T = std::decay_t<decltype(d)>;
-		    constexpr auto type = std::is_same_v<T, string> ? Message::String : Message::Binary;
-		    auto *b = reinterpret_cast<const byte *>(d.data());
-		    return outgoing(std::make_shared<Message>(b, b + d.size(), type));
-	    },
-	    data);
-}
+bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); }
 
 bool DataChannel::send(const byte *data, size_t size) {
 	return outgoing(std::make_shared<Message>(data, data + size, Message::Binary));
 }
 
-std::optional<std::variant<binary, string>> DataChannel::receive() {
+std::optional<message_variant> DataChannel::receive() {
 	while (!mRecvQueue.empty()) {
 		auto message = *mRecvQueue.pop();
-		switch (message->type) {
-		case Message::Control: {
+		if (message->type == Message::Control) {
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
-			if (raw[0] == MESSAGE_CLOSE)
+			if (!message->empty() && raw[0] == MESSAGE_CLOSE)
 				remoteClose();
-			break;
-		}
-		case Message::String:
-			return std::make_optional(
-			    string(reinterpret_cast<const char *>(message->data()), message->size()));
-		case Message::Binary:
-			return std::make_optional(std::move(*message));
-		default:
-			// Ignore
-			break;
+		} else {
+			return to_variant(std::move(*message));
 		}
 	}
 
@@ -223,7 +203,7 @@ bool DataChannel::outgoing(message_ptr message) {
 
 	auto transport = mSctpTransport.lock();
 	if (!transport)
-		throw std::runtime_error("DataChannel has no transport");
+		throw std::runtime_error("DataChannel transport is not open");
 
 	// Before the ACK has been received on a DataChannel, all messages must be sent ordered
 	message->reliability = mIsOpen ? mReliability : nullptr;

+ 13 - 7
src/dtlssrtptransport.cpp

@@ -52,7 +52,7 @@ DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
 #else
 	PLOG_DEBUG << "Setting SRTP profile (OpenSSL)";
 	// returns 0 on success, 1 on error
-	if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"), "Failed to set SRTP profile")
+	if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"))
 		throw std::runtime_error("Failed to set SRTP profile: " +
 		                         openssl::error_string(ERR_get_error()));
 #endif
@@ -170,23 +170,29 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
 			if (srtp_err_status_t err = srtp_unprotect_rtcp(mSrtpIn, message->data(), &size)) {
 				if (err == srtp_err_status_replay_fail)
 					PLOG_WARNING << "Incoming SRTCP packet is a replay";
+				else if (err == srtp_err_status_auth_fail)
+					PLOG_WARNING << "Incoming SRTCP packet failed authentication check";
 				else
 					PLOG_WARNING << "SRTCP unprotect error, status=" << err;
 				return;
 			}
 			PLOG_VERBOSE << "Unprotected SRTCP packet, size=" << size;
-            message->type = rtc::Message::Control;
+			message->type = Message::Type::Control;
+			message->stream = to_integer<uint8_t>(*(message->begin() + 1)); // Payload Type
 		} else {
 			PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
 			if (srtp_err_status_t err = srtp_unprotect(mSrtpIn, message->data(), &size)) {
 				if (err == srtp_err_status_replay_fail)
 					PLOG_WARNING << "Incoming SRTP packet is a replay";
+				else if (err == srtp_err_status_auth_fail)
+					PLOG_WARNING << "Incoming SRTP packet failed authentication check";
 				else
 					PLOG_WARNING << "SRTP unprotect error, status=" << err;
 				return;
 			}
 			PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
-			message->type = rtc::Message::Binary;
+			message->type = Message::Type::Binary;
+			message->stream = value2; // Payload Type
 		}
 
 		message->resize(size);
@@ -242,11 +248,11 @@ void DtlsSrtpTransport::postHandshake() {
 		throw std::runtime_error("Failed to derive SRTP keys: " +
 		                         openssl::error_string(ERR_get_error()));
 
+	// Order is client key, server key, client salt, and server salt
 	clientKey = material;
-	clientSalt = clientKey + SRTP_AES_128_KEY_LEN;
-
-	serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT;
-	serverSalt = serverKey + SRTP_AES_128_KEY_LEN;
+	serverKey = clientKey + SRTP_AES_128_KEY_LEN;
+	clientSalt = serverKey + SRTP_AES_128_KEY_LEN;
+	serverSalt = clientSalt + SRTP_SALT_LEN;
 #endif
 
 	unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT];

+ 60 - 0
src/message.cpp

@@ -0,0 +1,60 @@
+/**
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "message.hpp"
+
+namespace rtc {
+
+message_ptr make_message(size_t size, Message::Type type, unsigned int stream,
+                         std::shared_ptr<Reliability> reliability) {
+	auto message = std::make_shared<Message>(size, type);
+	message->stream = stream;
+	message->reliability = reliability;
+	return message;
+}
+
+message_ptr make_message(binary &&data, Message::Type type, unsigned int stream,
+                         std::shared_ptr<Reliability> reliability) {
+	auto message = std::make_shared<Message>(std::move(data), type);
+	message->stream = stream;
+	message->reliability = reliability;
+	return message;
+}
+
+message_ptr make_message(message_variant data) {
+	return std::visit( //
+	    overloaded{
+	        [&](binary data) { return make_message(std::move(data), Message::Binary); },
+	        [&](string data) {
+		        auto b = reinterpret_cast<const byte *>(data.data());
+		        return make_message(b, b + data.size(), Message::String);
+	        },
+	    },
+	    std::move(data));
+}
+
+message_variant to_variant(Message &&message) {
+	switch (message.type) {
+	case Message::String:
+		return string(reinterpret_cast<const char *>(message.data()), message.size());
+	default:
+		return std::move(message);
+	}
+}
+
+} // namespace rtc

+ 71 - 56
src/peerconnection.cpp

@@ -102,6 +102,9 @@ void PeerConnection::setRemoteDescription(Description description,
                                           std::optional<Description> mediaDescription) {
 	PLOG_VERBOSE << "Setting remote description: " << string(description);
 
+	if (!description.fingerprint())
+		throw std::runtime_error("Remote description is incomplete");
+
 	description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
 	auto type = description.type();
 	auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
@@ -182,9 +185,8 @@ std::optional<string> PeerConnection::remoteAddress() const {
 	return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
 }
 
-shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
-                                                          const string &protocol,
-                                                          const Reliability &reliability) {
+shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, string protocol,
+                                                          Reliability reliability) {
 	// RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
 	// setup:passive. [...] Thus, setup:active is RECOMMENDED.
 	// See https://tools.ietf.org/html/rfc5763#section-5
@@ -192,7 +194,8 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
 	auto iceTransport = std::atomic_load(&mIceTransport);
 	auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
 
-	auto channel = emplaceDataChannel(role, label, protocol, reliability);
+	auto channel =
+	    emplaceDataChannel(role, std::move(label), std::move(protocol), std::move(reliability));
 
 	if (!iceTransport) {
 		// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
@@ -214,12 +217,11 @@ void PeerConnection::onDataChannel(
 	mDataChannelCallback = callback;
 }
 
-void PeerConnection::onLocalDescription(
-    std::function<void(const Description &description)> callback) {
+void PeerConnection::onLocalDescription(std::function<void(Description description)> callback) {
 	mLocalDescriptionCallback = callback;
 }
 
-void PeerConnection::onLocalCandidate(std::function<void(const Candidate &candidate)> callback) {
+void PeerConnection::onLocalCandidate(std::function<void(Candidate candidate)> callback) {
 	mLocalCandidateCallback = callback;
 }
 
@@ -237,35 +239,17 @@ bool PeerConnection::hasMedia() const {
 	return (local && local->hasMedia()) || (remote && remote->hasMedia());
 }
 
-void PeerConnection::sendMedia(const binary &packet) {
-	outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary));
-}
-
-void PeerConnection::sendMedia(const byte *packet, size_t size) {
-	outgoingMedia(make_message(packet, packet + size, Message::Binary));
-}
+std::shared_ptr<Track> PeerConnection::createTrack(Description::Media description) {
+	if (localDescription())
+		throw std::logic_error("Tracks must be created before local description");
 
-void PeerConnection::sendMedia(message_ptr ptr) {
-    outgoingMedia(std::move(ptr));
+	auto track = std::make_shared<Track>(std::move(description));
+	mTracks.emplace(std::make_pair(track->mid(), track));
+	return track;
 }
 
-void PeerConnection::onMedia(const std::function<void(rtc::message_ptr)>& callback) {
-	mMediaCallback = callback;
-}
-
-void PeerConnection::outgoingMedia([[maybe_unused]] message_ptr message) {
-	if (!hasMedia())
-		throw std::runtime_error("PeerConnection has no media support");
-
-#if RTC_ENABLE_MEDIA
-	auto transport = std::atomic_load(&mDtlsTransport);
-	if (!transport)
-		throw std::runtime_error("PeerConnection is not open");
-
-	std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->sendMedia(message);
-#else
-	PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
-#endif
+void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
+	mTrackCallback = callback;
 }
 
 shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
@@ -346,7 +330,8 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
 
 			switch (state) {
 			case DtlsTransport::State::Connected:
-                initSctpTransport();
+				initSctpTransport();
+				openTracks();
 				break;
 			case DtlsTransport::State::Failed:
 				changeState(State::Failed);
@@ -529,8 +514,16 @@ void PeerConnection::forwardMessage(message_ptr message) {
 }
 
 void PeerConnection::forwardMedia(message_ptr message) {
-	if (message)
-		mMediaCallback(message);
+	if (!message)
+		return;
+
+	string mid;
+	// TODO: stream (PT) to mid
+
+	std::shared_lock lock(mTracksMutex); // read-only
+	if (auto it = mTracks.find(mid); it != mTracks.end())
+		if (auto track = it->second.lock())
+			track->incoming(message);
 }
 
 void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
@@ -538,10 +531,9 @@ void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
 		channel->triggerBufferedAmount(amount);
 }
 
-shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role,
-                                                           const string &label,
-                                                           const string &protocol,
-                                                           const Reliability &reliability) {
+shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
+                                                           string protocol,
+                                                           Reliability reliability) {
 	// The active side must use streams with even identifiers, whereas the passive side must use
 	// streams with odd identifiers.
 	// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
@@ -552,8 +544,8 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role rol
 		if (stream >= 65535)
 			throw std::runtime_error("Too many DataChannels");
 	}
-	auto channel =
-	    std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
+	auto channel = std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
+	                                             std::move(protocol), std::move(reliability));
 	mDataChannels.emplace(std::make_pair(stream, channel));
 	return channel;
 }
@@ -602,6 +594,21 @@ void PeerConnection::openDataChannels() {
 		iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
 }
 
+void PeerConnection::openTracks() {
+#if RTC_ENABLE_MEDIA
+	if (!hasMedia())
+		return;
+
+	if (auto transport = std::atomic_load(&mDtlsTransport)) {
+		auto srtpTransport = std::reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
+		std::shared_lock lock(mTracksMutex); // read-only
+		for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
+			if (auto track = it->second.lock())
+				track->open(srtpTransport);
+	}
+#endif
+}
+
 void PeerConnection::closeDataChannels() {
 	iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
 }
@@ -618,20 +625,19 @@ void PeerConnection::processLocalDescription(Description description) {
 	    remoteSctpPort = remote->sctpPort();
 	}
 
-	auto certificate = mCertificate.get(); // wait for certificate if not ready
+	if (remoteDataMid)
+		description.setDataMid(*remoteDataMid);
 
-	{
-		std::lock_guard lock(mLocalDescriptionMutex);
-		mLocalDescription.emplace(std::move(description));
-		if (remoteDataMid)
-			mLocalDescription->setDataMid(*remoteDataMid);
-
-		mLocalDescription->setFingerprint(certificate->fingerprint());
-		mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
-		mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
-	}
+	description.setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
+	description.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
+	description.setFingerprint(mCertificate.get()->fingerprint()); // wait for certificate
 
-	mProcessor->enqueue([this]() { mLocalDescriptionCallback(*mLocalDescription); });
+	std::lock_guard lock(mLocalDescriptionMutex);
+	mLocalDescription.emplace(std::move(description));
+
+	mProcessor->enqueue([this, description = *mLocalDescription]() {
+		mLocalDescriptionCallback(std::move(description));
+	});
 }
 
 void PeerConnection::processLocalCandidate(Candidate candidate) {
@@ -641,8 +647,9 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
 
 	mLocalDescription->addCandidate(candidate);
 
-	mProcessor->enqueue(
-	    [this, candidate = std::move(candidate)]() { mLocalCandidateCallback(candidate); });
+	mProcessor->enqueue([this, candidate = std::move(candidate)]() {
+		mLocalCandidateCallback(std::move(candidate));
+	});
 }
 
 void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
@@ -654,6 +661,14 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	    [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
 }
 
+void PeerConnection::triggerTrack(std::weak_ptr<Track> weakTrack) {
+	auto track = weakTrack.lock();
+	if (!track)
+		return;
+
+	mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
+}
+
 bool PeerConnection::changeState(State state) {
 	State current;
 	do {

+ 7 - 7
src/rtc.cpp

@@ -341,7 +341,7 @@ int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
 	return WRAP({
 		auto peerConnection = getPeerConnection(pc);
 		if (cb)
-			peerConnection->onLocalDescription([pc, cb](const Description &desc) {
+			peerConnection->onLocalDescription([pc, cb](Description desc) {
 				if (auto ptr = getUserPointer(pc))
 					cb(string(desc).c_str(), desc.typeString().c_str(), *ptr);
 			});
@@ -354,7 +354,7 @@ int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
 	return WRAP({
 		auto peerConnection = getPeerConnection(pc);
 		if (cb)
-			peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
+			peerConnection->onLocalCandidate([pc, cb](Candidate cand) {
 				if (auto ptr = getUserPointer(pc))
 					cb(cand.candidate().c_str(), cand.mid().c_str(), *ptr);
 			});
@@ -542,7 +542,7 @@ int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
 	return WRAP({
 		auto channel = getChannel(id);
 		if (cb)
-			channel->onError([id, cb](const string &error) {
+			channel->onError([id, cb](string error) {
 				if (auto ptr = getUserPointer(id))
 					cb(error.c_str(), *ptr);
 			});
@@ -556,11 +556,11 @@ int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
 		auto channel = getChannel(id);
 		if (cb)
 			channel->onMessage(
-			    [id, cb](const binary &b) {
+			    [id, cb](binary b) {
 				    if (auto ptr = getUserPointer(id))
 					    cb(reinterpret_cast<const char *>(b.data()), int(b.size()), *ptr);
 			    },
-			    [id, cb](const string &s) {
+			    [id, cb](string s) {
 				    if (auto ptr = getUserPointer(id))
 					    cb(s.c_str(), -int(s.size() + 1), *ptr);
 			    });
@@ -643,13 +643,13 @@ int rtcReceiveMessage(int id, char *buffer, int *size) {
 		if (auto message = channel->receive())
 			return std::visit( //
 			    overloaded{    //
-			               [&](const binary &b) {
+			               [&](binary b) {
 				               *size = std::min(*size, int(b.size()));
 				               auto data = reinterpret_cast<const char *>(b.data());
 				               std::copy(data, data + *size, buffer);
 				               return 1;
 			               },
-			               [&](const string &s) {
+			               [&](string s) {
 				               int len = std::min(*size - 1, int(s.size()));
 				               if (len >= 0) {
 					               std::copy(s.data(), s.data() + len, buffer);

+ 25 - 2
src/sctptransport.cpp

@@ -600,7 +600,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 	}
 
 	auto type = notify->sn_header.sn_type;
-	PLOG_VERBOSE << "Process notification, type=" << type;
+	PLOG_VERBOSE << "Processing notification, type=" << type;
 
 	switch (type) {
 	case SCTP_ASSOC_CHANGE: {
@@ -622,7 +622,8 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 	}
 
 	case SCTP_SENDER_DRY_EVENT: {
-		// It not should be necessary since the send callback should have been called already,
+		PLOG_VERBOSE << "SCTP dry event";
+		// It should not be necessary since the send callback should have been called already,
 		// but to be sure, let's try to send now.
 		safeFlush();
 		break;
@@ -633,6 +634,28 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 		const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
 		const uint16_t flags = reset_event.strreset_flags;
 
+		IF_PLOG(plog::verbose) {
+			std::ostringstream desc;
+			desc << "flags=";
+			if (flags & SCTP_STREAM_RESET_OUTGOING_SSN && flags & SCTP_STREAM_RESET_INCOMING_SSN)
+				desc << "outgoing|incoming";
+			else if (flags & SCTP_STREAM_RESET_OUTGOING_SSN)
+				desc << "outgoing";
+			else if (flags & SCTP_STREAM_RESET_INCOMING_SSN)
+				desc << "incoming";
+			else
+				desc << "0";
+
+			desc << ", streams=[";
+			for (int i = 0; i < count; ++i) {
+				uint16_t streamId = reset_event.strreset_stream_list[i];
+				desc << (i != 0 ? "," : "") << streamId;
+			}
+			desc << "]";
+
+			PLOG_VERBOSE << "SCTP reset event, " << desc.str();
+		}
+
 		if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
 			for (int i = 0; i < count; ++i) {
 				uint16_t streamId = reset_event.strreset_stream_list[i];

+ 1 - 2
src/tls.cpp

@@ -85,10 +85,9 @@ void init() {
 	static bool done = false;
 
 	std::lock_guard lock(mutex);
-	if (!done) {
+	if (!std::exchange(done, true)) {
 		OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
 		OPENSSL_init_crypto(OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
-		done = true;
 	}
 }
 

+ 107 - 0
src/track.cpp

@@ -0,0 +1,107 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "track.hpp"
+#include "dtlssrtptransport.hpp"
+#include "include.hpp"
+
+namespace rtc {
+
+using std::shared_ptr;
+using std::weak_ptr;
+
+Track::Track(Description::Media description)
+    : mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
+
+string Track::mid() const { return mMediaDescription.mid; }
+
+Description::Media Track::description() const { return mMediaDescription; }
+
+void Track::close() {
+	mIsClosed = true;
+	resetCallbacks();
+}
+
+bool Track::send(message_variant data) { return outgoing(make_message(std::move(data))); }
+
+bool Track::send(const byte *data, size_t size) {
+	return outgoing(std::make_shared<Message>(data, data + size, Message::Binary));
+}
+
+std::optional<message_variant> Track::receive() {
+	if (!mRecvQueue.empty())
+		return to_variant(std::move(**mRecvQueue.pop()));
+
+	return nullopt;
+}
+
+bool Track::isOpen(void) const {
+#if RTC_ENABLE_MEDIA
+	return !mIsClosed && mDtlsSrtpTransport.lock();
+#else
+	return !mIsClosed;
+#endif
+}
+
+bool Track::isClosed(void) const { return mIsClosed; }
+
+size_t Track::maxMessageSize() const {
+	return 65535 - 12 - 4; // SRTP/UDP
+}
+
+size_t Track::availableAmount() const {
+	return mRecvQueue.amount();
+}
+
+#if RTC_ENABLE_MEDIA
+void Track::open(shared_ptr<DtlsSrtpTransport> transport) { mDtlsSrtpTransport = transport; }
+#endif
+
+bool Track::outgoing(message_ptr message) {
+	if (mIsClosed)
+		throw std::runtime_error("Track is closed");
+
+	if (message->size() > maxMessageSize())
+		throw std::runtime_error("Message size exceeds limit");
+
+#if RTC_ENABLE_MEDIA
+	auto transport = mDtlsSrtpTransport.lock();
+	if (!transport)
+		throw std::runtime_error("Track transport is not open");
+
+	return transport->sendMedia(message);
+#else
+	PLOG_WARNING << "Ignoring track send (not compiled with SRTP support)";
+	return false;
+#endif
+}
+
+void Track::incoming(message_ptr message) {
+	if (!message)
+		return;
+
+	// Tail drop if queue is full
+	if (mRecvQueue.full())
+		return;
+
+	mRecvQueue.push(message);
+	triggerAvailable(mRecvQueue.size());
+}
+
+} // namespace rtc
+

+ 7 - 23
src/websocket.cpp

@@ -38,7 +38,8 @@ namespace rtc {
 using std::shared_ptr;
 
 WebSocket::WebSocket(std::optional<Configuration> config)
-    : mConfig(config ? std::move(*config) : Configuration()) {
+    : mConfig(config ? std::move(*config) : Configuration()),
+      mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
 	PLOG_VERBOSE << "Creating WebSocket";
 }
 
@@ -100,16 +101,7 @@ void WebSocket::remoteClose() {
 	}
 }
 
-bool WebSocket::send(const std::variant<binary, string> &data) {
-	return std::visit(
-	    [&](const auto &d) {
-		    using T = std::decay_t<decltype(d)>;
-		    constexpr auto type = std::is_same_v<T, string> ? Message::String : Message::Binary;
-		    auto *b = reinterpret_cast<const byte *>(d.data());
-		    return outgoing(std::make_shared<Message>(b, b + d.size(), type));
-	    },
-	    data);
-}
+bool WebSocket::send(message_variant data) { return outgoing(make_message(std::move(data))); }
 
 bool WebSocket::isOpen() const { return mState == State::Open; }
 
@@ -117,19 +109,11 @@ bool WebSocket::isClosed() const { return mState == State::Closed; }
 
 size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
 
-std::optional<std::variant<binary, string>> WebSocket::receive() {
+std::optional<message_variant> WebSocket::receive() {
 	while (!mRecvQueue.empty()) {
 		auto message = *mRecvQueue.pop();
-		switch (message->type) {
-		case Message::String:
-			return std::make_optional(
-			    string(reinterpret_cast<const char *>(message->data()), message->size()));
-		case Message::Binary:
-			return std::make_optional(std::move(*message));
-		default:
-			// Ignore
-			break;
-		}
+		if (message->type != Message::Control)
+			return to_variant(std::move(*message));
 	}
 	return nullopt;
 }
@@ -337,6 +321,6 @@ void WebSocket::closeTransports() {
 	});
 }
 
-} // namespace rtc
+	} // namespace rtc
 
 #endif

+ 22 - 23
test/benchmark.cpp

@@ -48,20 +48,20 @@ size_t benchmark(milliseconds duration) {
 
 	auto pc2 = std::make_shared<PeerConnection>(config2);
 
-	pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](const Description &sdp) {
+	pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](Description sdp) {
 		auto pc2 = wpc2.lock();
 		if (!pc2)
 			return;
 		cout << "Description 1: " << sdp << endl;
-		pc2->setRemoteDescription(sdp);
+		pc2->setRemoteDescription(std::move(sdp));
 	});
 
-	pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](const Candidate &candidate) {
+	pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](Candidate candidate) {
 		auto pc2 = wpc2.lock();
 		if (!pc2)
 			return;
 		cout << "Candidate 1: " << candidate << endl;
-		pc2->addRemoteCandidate(candidate);
+		pc2->addRemoteCandidate(std::move(candidate));
 	});
 
 	pc1->onStateChange([](PeerConnection::State state) { cout << "State 1: " << state << endl; });
@@ -69,20 +69,20 @@ size_t benchmark(milliseconds duration) {
 		cout << "Gathering state 1: " << state << endl;
 	});
 
-	pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](const Description &sdp) {
+	pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](Description sdp) {
 		auto pc1 = wpc1.lock();
 		if (!pc1)
 			return;
 		cout << "Description 2: " << sdp << endl;
-		pc1->setRemoteDescription(sdp);
+		pc1->setRemoteDescription(std::move(sdp));
 	});
 
-	pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](const Candidate &candidate) {
+	pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](Candidate candidate) {
 		auto pc1 = wpc1.lock();
 		if (!pc1)
 			return;
 		cout << "Candidate 2: " << candidate << endl;
-		pc1->addRemoteCandidate(candidate);
+		pc1->addRemoteCandidate(std::move(candidate));
 	});
 
 	pc2->onStateChange([](PeerConnection::State state) { cout << "State 2: " << state << endl; });
@@ -99,21 +99,20 @@ size_t benchmark(milliseconds duration) {
 	steady_clock::time_point startTime, openTime, receivedTime, endTime;
 
 	shared_ptr<DataChannel> dc2;
-	pc2->onDataChannel(
-	    [&dc2, &receivedSize, &receivedTime](shared_ptr<DataChannel> dc) {
-		    dc->onMessage([&receivedTime, &receivedSize](const variant<binary, string> &message) {
-			    if (holds_alternative<binary>(message)) {
-				    const auto &bin = get<binary>(message);
-				    if (receivedSize == 0)
-					    receivedTime = steady_clock::now();
-				    receivedSize += bin.size();
-			    }
-		    });
-
-		    dc->onClosed([]() { cout << "DataChannel closed." << endl; });
-
-		    std::atomic_store(&dc2, dc);
-	    });
+	pc2->onDataChannel([&dc2, &receivedSize, &receivedTime](shared_ptr<DataChannel> dc) {
+		dc->onMessage([&receivedTime, &receivedSize](variant<binary, string> message) {
+			if (holds_alternative<binary>(message)) {
+				const auto &bin = get<binary>(message);
+				if (receivedSize == 0)
+					receivedTime = steady_clock::now();
+				receivedSize += bin.size();
+			}
+		});
+
+		dc->onClosed([]() { cout << "DataChannel closed." << endl; });
+
+		std::atomic_store(&dc2, dc);
+	});
 
 	startTime = steady_clock::now();
 	auto dc1 = pc1->createDataChannel("benchmark");

+ 9 - 9
test/connectivity.cpp

@@ -47,20 +47,20 @@ void test_connectivity() {
 
 	auto pc2 = std::make_shared<PeerConnection>(config2);
 
-	pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](const Description &sdp) {
+	pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](Description sdp) {
 		auto pc2 = wpc2.lock();
 		if (!pc2)
 			return;
 		cout << "Description 1: " << sdp << endl;
-		pc2->setRemoteDescription(sdp);
+		pc2->setRemoteDescription(std::move(sdp));
 	});
 
-	pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](const Candidate &candidate) {
+	pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](Candidate candidate) {
 		auto pc2 = wpc2.lock();
 		if (!pc2)
 			return;
 		cout << "Candidate 1: " << candidate << endl;
-		pc2->addRemoteCandidate(candidate);
+		pc2->addRemoteCandidate(std::move(candidate));
 	});
 
 	pc1->onStateChange([](PeerConnection::State state) { cout << "State 1: " << state << endl; });
@@ -69,20 +69,20 @@ void test_connectivity() {
 		cout << "Gathering state 1: " << state << endl;
 	});
 
-	pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](const Description &sdp) {
+	pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](Description sdp) {
 		auto pc1 = wpc1.lock();
 		if (!pc1)
 			return;
 		cout << "Description 2: " << sdp << endl;
-		pc1->setRemoteDescription(sdp);
+		pc1->setRemoteDescription(std::move(sdp));
 	});
 
-	pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](const Candidate &candidate) {
+	pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](Candidate candidate) {
 		auto pc1 = wpc1.lock();
 		if (!pc1)
 			return;
 		cout << "Candidate 2: " << candidate << endl;
-		pc1->addRemoteCandidate(candidate);
+		pc1->addRemoteCandidate(std::move(candidate));
 	});
 
 	pc2->onStateChange([](PeerConnection::State state) { cout << "State 2: " << state << endl; });
@@ -95,7 +95,7 @@ void test_connectivity() {
 	pc2->onDataChannel([&dc2](shared_ptr<DataChannel> dc) {
 		cout << "DataChannel 2: Received with label \"" << dc->label() << "\"" << endl;
 
-		dc->onMessage([](const variant<binary, string> &message) {
+		dc->onMessage([](variant<binary, string> message) {
 			if (holds_alternative<string>(message)) {
 				cout << "Message 2: " << get<string>(message) << endl;
 			}

+ 2 - 2
test/websocket.cpp

@@ -53,9 +53,9 @@ void test_websocket() {
 	ws->onClosed([]() { cout << "WebSocket: Closed" << endl; });
 
 	std::atomic<bool> received = false;
-	ws->onMessage([&received, &myMessage](const variant<binary, string> &message) {
+	ws->onMessage([&received, &myMessage](variant<binary, string> message) {
 		if (holds_alternative<string>(message)) {
-			string str = get<string>(message);
+			string str = std::move(get<string>(message));
 			if((received = (str == myMessage)))
 				cout << "WebSocket: Received expected message" << endl;
 			else