Browse Source

Merge pull request #153 from paullouisageneau/track-api

Add Track API
Paul-Louis Ageneau 4 years ago
parent
commit
a18e9c50c4

+ 5 - 1
CMakeLists.txt

@@ -53,8 +53,9 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/peerconnection.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
-	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/track.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
 )
 
@@ -74,6 +75,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/configuration.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/datachannel.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/description.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtp.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/include.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/init.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/log.hpp
@@ -83,6 +85,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
 )
 
@@ -265,6 +268,7 @@ if(NOT NO_EXAMPLES)
 	set(JSON_BuildTests OFF CACHE INTERNAL "")
 	add_subdirectory(deps/json)
 	add_subdirectory(examples/client)
+	add_subdirectory(examples/media)
 	add_subdirectory(examples/copy-paste)
 	add_subdirectory(examples/copy-paste-capi)
 endif()

+ 1 - 1
deps/libjuice

@@ -1 +1 @@
-Subproject commit 9be39ad50bcad5900c26fe1f4a4f7f1de621d040
+Subproject commit d8179c83003043f1b7f5aac8fa10ba61aa634da2

+ 2 - 0
examples/README.md

@@ -7,6 +7,8 @@ This directory contains different WebRTC clients and compatible WebSocket + JSON
 - [signaling-server-python](signaling-server-python) contains a similar signaling server in Python
 - [signaling-server-rust](signaling-server-rust) contains a similar signaling server in Rust (see [lerouxrgd/datachannel-rs](https://github.com/lerouxrgd/datachannel-rs) for Rust wrappers)
 
+- [media](media) is a copy/paste demo to send the webcam from your browser into gstreamer.
+
 Additionally, it contains two debugging tools for libdatachannel with copy-pasting as signaling:
 - [copy-paste](copy-paste) using the C++ API
 - [copy-paste-capi](copy-paste-capi) using the C API

+ 14 - 0
examples/media/CMakeLists.txt

@@ -0,0 +1,14 @@
+cmake_minimum_required(VERSION 3.7)
+
+add_executable(datachannel-media main.cpp)
+set_target_properties(datachannel-media PROPERTIES
+        CXX_STANDARD 17
+        OUTPUT_NAME media)
+
+if(WIN32)
+    target_link_libraries(datachannel-media datachannel-static) # DLL exports only the C API
+else()
+    target_link_libraries(datachannel-media datachannel)
+endif()
+
+target_link_libraries(datachannel-media datachannel nlohmann_json)

+ 19 - 0
examples/media/README.md

@@ -0,0 +1,19 @@
+# Example Webcam from Browser to Port 5000
+This is an example copy/paste demo to send your webcam from your browser and out port 5000 through the demo application.
+
+## How to use
+Open main.html in your browser (you must open it either as HTTPS or as a domain of http://localhost).
+
+Start the application and copy it's offer into the text box of the web page.
+
+Copy the answer of the webpage back into the application.
+
+You will now see RTP traffic on `localhost:5000` of the computer that the application is running on.
+
+Use the following gstreamer demo pipeline to display the traffic 
+(you might need to wave your hand in front of your camera to force an I-frame). 
+
+`gst-launch-1.0 udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp" ! queue ! rtph264depay ! video/x-h264,stream-format=byte-stream ! queue ! avdec_h264 ! queue ! autovideosink`
+
+## Troubleshooting
+Use chrome.

+ 94 - 0
examples/media/main.cpp

@@ -0,0 +1,94 @@
+/*
+ * libdatachannel client example
+ * Copyright (c) 2020 Staz M
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _WINSOCK_DEPRECATED_NO_WARNINGS
+
+#include "rtc/rtc.hpp"
+
+#include <iostream>
+#include <memory>
+#include <utility>
+
+#include <nlohmann/json.hpp>
+
+#ifdef _WIN32
+#include <winsock2.h>
+#else
+#include <arpa/inet.h>
+typedef int SOCKET;
+#endif
+
+using nlohmann::json;
+
+int main() {
+	rtc::InitLogger(rtc::LogLevel::Debug);
+	auto pc = std::make_shared<rtc::PeerConnection>();
+
+	pc->onStateChange(
+	    [](rtc::PeerConnection::State state) { std::cout << "State: " << state << std::endl; });
+
+	pc->onGatheringStateChange([pc](rtc::PeerConnection::GatheringState state) {
+		std::cout << "Gathering State: " << state << std::endl;
+		if (state == rtc::PeerConnection::GatheringState::Complete) {
+			auto description = pc->localDescription();
+			json message = {{"type", description->typeString()},
+			                {"sdp", std::string(description.value())}};
+			std::cout << message << std::endl;
+		}
+	});
+
+	SOCKET sock = socket(AF_INET, SOCK_DGRAM, 0);
+	sockaddr_in addr;
+	addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+	addr.sin_port = htons(5000);
+	addr.sin_family = AF_INET;
+
+	rtc::Description::Video media("video", rtc::Description::RecvOnly);
+	media.addH264Codec(96);
+	media.setBitrate(
+	    3000); // Request 3Mbps (Browsers do not encode more than 2.5MBps from a webcam)
+
+	auto track = pc->createTrack(media);
+	auto dc = pc->createDataChannel("test");
+
+	auto session = std::make_shared<rtc::RtcpSession>();
+	track->setRtcpHandler(session);
+
+	track->onMessage(
+	    [session, sock, addr](rtc::binary message) {
+		    // This is an RTP packet
+		    sendto(sock, reinterpret_cast<const char *>(message.data()), message.size(), 0,
+		           reinterpret_cast<const struct sockaddr *>(&addr), sizeof(addr));
+	    },
+	    nullptr);
+
+	// TODO
+	// pc->setLocalDescription();
+
+	std::cout << "Expect RTP video traffic on localhost:5000" << std::endl;
+	std::cout << "Please copy/paste the answer provided by the browser: " << std::endl;
+	std::string sdp;
+	std::getline(std::cin, sdp);
+	std::cout << "Got answer" << sdp << std::endl;
+	json j = json::parse(sdp);
+	rtc::Description answer(j["sdp"].get<std::string>(), j["type"].get<std::string>());
+	pc->setRemoteDescription(answer);
+	std::cout << "Press any key to exit." << std::endl;
+	std::cin >> sdp;
+}

+ 45 - 0
examples/media/main.html

@@ -0,0 +1,45 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <title>Title</title>
+</head>
+<body>
+
+<p>Please enter the offer provided to you by the application: </p>
+<textarea cols="50" rows="50"></textarea>
+<button>Submit</button>
+
+<script>
+    document.querySelector('button').addEventListener('click',  async () => {
+        let offer = JSON.parse(document.querySelector('textarea').value);
+        rtc = new RTCPeerConnection({
+            // Recommended for libdatachannel
+            bundlePolicy: "max-bundle",
+        });
+
+        rtc.onicegatheringstatechange = (state) => {
+            if (rtc.iceGatheringState === 'complete') {
+                // We only want to provide an answer once all of our candidates have been added to the SDP.
+                let answer = rtc.localDescription;
+                document.querySelector('textarea').value = JSON.stringify({"type": answer.type, sdp: answer.sdp});
+                document.querySelector('p').value = 'Please paste the answer in the application.';
+                alert('Please paste the answer in the application.');
+            }
+        }
+        await rtc.setRemoteDescription(offer);
+
+        let media = await navigator.mediaDevices.getUserMedia({
+            video: {
+                width: 1280,
+                height: 720
+            }
+        });
+        media.getTracks().forEach(track => rtc.addTrack(track, media));
+        let answer = await rtc.createAnswer();
+        await rtc.setLocalDescription(answer);
+    })
+</script>
+
+</body>
+</html>

+ 137 - 29
include/rtc/description.hpp

@@ -1,5 +1,6 @@
 /**
- * Copyright (c) 2019 Paul-Louis Ageneau
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
+ * Copyright (c) 2020 Staz M
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -24,7 +25,9 @@
 
 #include <iostream>
 #include <map>
+#include <memory>
 #include <optional>
+#include <variant>
 #include <vector>
 
 namespace rtc {
@@ -33,6 +36,7 @@ class Description {
 public:
 	enum class Type { Unspec = 0, Offer = 1, Answer = 2 };
 	enum class Role { ActPass = 0, Passive = 1, Active = 2 };
+	enum Direction { SendOnly, RecvOnly, SendRecv, Unknown };
 
 	Description(const string &sdp, const string &typeString = "");
 	Description(const string &sdp, Type type);
@@ -42,54 +46,158 @@ public:
 	string typeString() const;
 	Role role() const;
 	string roleString() const;
-	string dataMid() const;
 	string bundleMid() const;
 	std::optional<string> fingerprint() const;
-	std::optional<uint16_t> sctpPort() const;
-	std::optional<size_t> maxMessageSize() const;
 	bool ended() const;
 
 	void hintType(Type type);
-	void setDataMid(string mid);
 	void setFingerprint(string fingerprint);
-	void setSctpPort(uint16_t port);
-	void setMaxMessageSize(size_t size);
 
 	void addCandidate(Candidate candidate);
 	void endCandidates();
 	std::vector<Candidate> extractCandidates();
 
-	bool hasMedia() const;
-	void addMedia(const Description &source);
-
 	operator string() const;
-	string generateSdp(const string &eol) const;
-	string generateDataSdp(const string &eol) const;
+	string generateSdp(string_view eol) const;
+	string generateApplicationSdp(string_view eol) const;
+
+	class Entry {
+	public:
+		Entry(string mline, string mid = "", Direction dir = Direction::Unknown);
+		virtual ~Entry() = default;
+
+		virtual string type() const { return mType; }
+		virtual string description() const { return mDescription; }
+		virtual string mid() const { return mMid; }
+		Direction direction() const;
+
+		virtual void parseSdpLine(string_view line);
+		virtual string generateSdp(string_view eol) const;
+
+	protected:
+		std::vector<string> mAttributes;
+		Direction mDirection;
+
+	private:
+		string mType;
+		string mDescription;
+		string mMid;
+	};
+
+	struct Application : public Entry {
+	public:
+		Application(string mid = "data");
+		Application(const Application &other) = default;
+		Application(Application &&other) = default;
+
+		string description() const override;
+		Application reciprocate() const;
+
+		void setSctpPort(uint16_t port) { mSctpPort = port; }
+		void hintSctpPort(uint16_t port) { mSctpPort = mSctpPort.value_or(port); }
+		void setMaxMessageSize(size_t size) { mMaxMessageSize = size; }
+
+		std::optional<uint16_t> sctpPort() const { return mSctpPort; }
+		std::optional<size_t> maxMessageSize() const { return mMaxMessageSize; }
+
+		virtual void parseSdpLine(string_view line) override;
+		virtual string generateSdp(string_view eol) const override;
+
+	private:
+		std::optional<uint16_t> mSctpPort;
+		std::optional<size_t> mMaxMessageSize;
+	};
+
+	// Media (non-data)
+	class Media : public Entry {
+	public:
+		Media(string mline, string mid = "media", Direction dir = Direction::SendOnly);
+		Media(const Media &other) = default;
+		Media(Media &&other) = default;
+		virtual ~Media() = default;
+
+		string description() const override;
+		Media reciprocate() const;
+
+		void removeFormat(const string &fmt);
+
+		void addVideoCodec(int payloadType, const string &codec);
+		void addH264Codec(int payloadType);
+		void addVP8Codec(int payloadType);
+		void addVP9Codec(int payloadType);
+
+		void setBitrate(int bitrate);
+		int getBitrate() const;
+
+		bool hasPayloadType(int payloadType) const;
+
+		virtual void parseSdpLine(string_view line) override;
+		virtual string generateSdp(string_view eol) const override;
+
+	private:
+		int mBas = -1;
+
+		struct RTPMap {
+			RTPMap(string_view mline);
+
+			void removeFB(const string &string);
+			void addFB(const string &string);
+
+			int pt;
+			string format;
+			int clockRate;
+			string encParams;
+
+			std::vector<string> rtcpFbs;
+			std::vector<string> fmtps;
+		};
+
+		Media::RTPMap &getFormat(int fmt);
+		Media::RTPMap &getFormat(const string &fmt);
+
+		std::map<int, RTPMap> mRtpMap;
+	};
+
+	class Audio : public Media {
+	public:
+		Audio(string mid = "audio", Direction dir = Direction::SendOnly);
+	};
+
+	class Video : public Media {
+	public:
+		Video(string mid = "video", Direction dir = Direction::SendOnly);
+	};
+
+	bool hasApplication() const;
+	bool hasAudioOrVideo() const;
+
+	int addMedia(Media media);
+	int addMedia(Application application);
+	int addApplication(string mid = "data");
+	int addVideo(string mid = "video", Direction dir = Direction::SendOnly);
+	int addAudio(string mid = "audio", Direction dir = Direction::SendOnly);
+
+	std::variant<Media *, Application *> media(int index);
+	std::variant<const Media *, const Application *> media(int index) const;
+	int mediaCount() const;
+
+	Application *application();
 
 private:
+	std::shared_ptr<Entry> createEntry(string mline, string mid, Direction dir);
+	void removeApplication();
+
 	Type mType;
+
+	// Session-level attributes
 	Role mRole;
 	string mSessionId;
 	string mIceUfrag, mIcePwd;
 	std::optional<string> mFingerprint;
 
-	// Data
-	struct Data {
-		string mid;
-		std::optional<uint16_t> sctpPort;
-		std::optional<size_t> maxMessageSize;
-	};
-	Data mData;
-
-	// Media (non-data)
-	struct Media {
-		Media(const string &mline);
-		string type;
-		string description;
-		string mid;
-		std::vector<string> attributes;
-	};
-	std::map<int, Media> mMedia; // by m-line index
+	// Entries
+	std::vector<std::shared_ptr<Entry>> mEntries;
+	std::shared_ptr<Application> mApplication;
 
 	// Candidates
 	std::vector<Candidate> mCandidates;

+ 4 - 0
include/rtc/include.hpp

@@ -41,12 +41,14 @@
 #include <mutex>
 #include <optional>
 #include <string>
+#include <string_view>
 #include <vector>
 
 namespace rtc {
 
 using std::byte;
 using std::string;
+using std::string_view;
 using binary = std::vector<byte>;
 
 using std::nullopt;
@@ -64,6 +66,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
 const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536;    // Remote max message size if not specified in SDP
 const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
 
+const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
+
 const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
 
 // overloaded helper

+ 1 - 1
include/rtc/message.hpp

@@ -73,7 +73,7 @@ message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
 
 message_ptr make_message(message_variant data);
 
-std::optional<message_variant> to_variant(Message &&message);
+message_variant to_variant(Message &&message);
 
 } // namespace rtc
 

+ 19 - 15
include/rtc/peerconnection.hpp

@@ -28,6 +28,7 @@
 #include "message.hpp"
 #include "reliability.hpp"
 #include "rtc.hpp"
+#include "track.hpp"
 
 #include <atomic>
 #include <functional>
@@ -80,13 +81,12 @@ public:
 	std::optional<string> localAddress() const;
 	std::optional<string> remoteAddress() const;
 
-	void setLocalDescription(std::optional<Description> mediaDescription = nullopt);
-	void setRemoteDescription(Description description,
-	                          std::optional<Description> mediaDescription = nullopt);
+	void setLocalDescription();
+	void setRemoteDescription(Description description);
 	void addRemoteCandidate(Candidate candidate);
 
-	std::shared_ptr<DataChannel> createDataChannel(const string &label, const string &protocol = "",
-	                                               const Reliability &reliability = {});
+	std::shared_ptr<DataChannel> createDataChannel(string label, string protocol = "",
+	                                               Reliability reliability = {});
 
 	void onDataChannel(std::function<void(std::shared_ptr<DataChannel> dataChannel)> callback);
 	void onLocalDescription(std::function<void(Description description)> callback);
@@ -100,12 +100,11 @@ public:
 	size_t bytesReceived();
 	std::optional<std::chrono::milliseconds> rtt();
 
-	// Media
+	// Media support requires compilation with SRTP
 	bool hasMedia() const;
-	void sendMedia(binary packet);
-	void sendMedia(const byte *packet, size_t size);
 
-	void onMedia(std::function<void(binary)> callback);
+	std::shared_ptr<Track> createTrack(Description::Media description);
+	void onTrack(std::function<void(std::shared_ptr<Track> track)> callback);
 
 	// libnice only
 	bool getSelectedCandidatePair(CandidateInfo *local, CandidateInfo *remote);
@@ -122,18 +121,20 @@ private:
 	void forwardMedia(message_ptr message);
 	void forwardBufferedAmount(uint16_t stream, size_t amount);
 
-	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, const string &label,
-	                                                const string &protocol,
-	                                                const Reliability &reliability);
+	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, string label,
+	                                                string protocol, Reliability reliability);
 	std::shared_ptr<DataChannel> findDataChannel(uint16_t stream);
 	void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
 	void openDataChannels();
 	void closeDataChannels();
 	void remoteCloseDataChannels();
 
+	void openTracks();
+
 	void processLocalDescription(Description description);
 	void processLocalCandidate(Candidate candidate);
 	void triggerDataChannel(std::weak_ptr<DataChannel> weakDataChannel);
+	void triggerTrack(std::weak_ptr<Track> weakTrack);
 	bool changeState(State state);
 	bool changeGatheringState(GatheringState state);
 
@@ -153,8 +154,11 @@ private:
 	std::shared_ptr<DtlsTransport> mDtlsTransport;
 	std::shared_ptr<SctpTransport> mSctpTransport;
 
-	std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels;
-	std::shared_mutex mDataChannelsMutex;
+	std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels; // by stream ID
+	std::unordered_map<string, std::weak_ptr<Track>> mTracks;                   // by mid
+	std::shared_mutex mDataChannelsMutex, mTracksMutex;
+
+	std::unordered_map<unsigned int, string> mMidFromPayloadType; // cache
 
 	std::atomic<State> mState;
 	std::atomic<GatheringState> mGatheringState;
@@ -164,7 +168,7 @@ private:
 	synchronized_callback<Candidate> mLocalCandidateCallback;
 	synchronized_callback<State> mStateChangeCallback;
 	synchronized_callback<GatheringState> mGatheringStateChangeCallback;
-	synchronized_callback<binary> mMediaCallback;
+	synchronized_callback<std::shared_ptr<Track>> mTrackCallback;
 };
 
 } // namespace rtc

+ 6 - 0
include/rtc/queue.hpp

@@ -39,6 +39,7 @@ public:
 
 	void stop();
 	bool empty() const;
+	bool full() const;
 	size_t size() const;   // elements
 	size_t amount() const; // amount
 	void push(T element);
@@ -80,6 +81,11 @@ template <typename T> bool Queue<T>::empty() const {
 	return mQueue.empty();
 }
 
+template <typename T> bool Queue<T>::full() const {
+	std::lock_guard lock(mMutex);
+	return mQueue.size() >= mLimit;
+}
+
 template <typename T> size_t Queue<T>::size() const {
 	std::lock_guard lock(mMutex);
 	return mQueue.size();

+ 1 - 0
include/rtc/rtp.hp

@@ -0,0 +1 @@
+

+ 459 - 0
include/rtc/rtp.hpp

@@ -0,0 +1,459 @@
+/**
+ * Copyright (c) 2020 Staz M
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_RTPL_H
+#define RTC_RTPL_H
+
+#include "include.hpp"
+#include "log.hpp"
+#include "message.hpp"
+
+#include <cmath>
+#include <functional>
+#include <iostream>
+#include <utility>
+
+#ifdef _WIN32
+#include <winsock2.h>
+#else
+#include <arpa/inet.h>
+#endif
+
+#ifndef htonll
+#define htonll(x)                                                                                  \
+	((uint64_t)htonl(((uint64_t)(x)&0xFFFFFFFF) << 32) | (uint64_t)htonl((uint64_t)(x) >> 32))
+#endif
+#ifndef ntohll
+#define ntohll(x) htonll(x)
+#endif
+
+namespace rtc {
+
+using std::size_t;
+
+typedef uint32_t SSRC;
+
+#pragma pack(push, 1)
+
+struct RTCP_ReportBlock {
+	SSRC ssrc;
+
+private:
+	uint32_t _fractionLostAndPacketsLost; // fraction lost is 8-bit, packets lost is 24-bit
+	uint16_t _seqNoCycles;
+	uint16_t _highestSeqNo;
+	uint32_t _jitter;
+	uint32_t _lastReport;
+	uint32_t _delaySinceLastReport;
+
+public:
+	void print() {
+		std::cout << " ssrc:" << ntohl(ssrc) <<
+		    // TODO Implement these reports
+		    //			  " fractionLost: " << fractionLost <<
+		    //			  " packetsLost: " << packetsLost <<
+		    " highestSeqNo:" << highestSeqNo() << " seqNoCycles:" << seqNoCycles()
+		          << " jitter:" << jitter() << " lastSR:" << getNTPOfSR()
+		          << " lastSRDelay:" << getDelaySinceSR();
+	}
+
+	void preparePacket(SSRC ssrc, [[maybe_unused]] unsigned int packetsLost,
+	                   [[maybe_unused]] unsigned int totalPackets, uint16_t highestSeqNo,
+	                   uint16_t seqNoCycles, uint32_t jitter, uint64_t lastSR_NTP,
+	                   uint64_t lastSR_DELAY) {
+		setSeqNo(highestSeqNo, seqNoCycles);
+		setJitter(jitter);
+		setSSRC(ssrc);
+
+		// Middle 32 bits of NTP Timestamp
+		//		  this->lastReport = lastSR_NTP >> 16u;
+		setNTPOfSR(uint32_t(lastSR_NTP));
+		setDelaySinceSR(uint32_t(lastSR_DELAY));
+
+		// The delay, expressed in units of 1/65536 seconds
+		//		  this->delaySinceLastReport = lastSR_DELAY;
+	}
+
+	void inline setSSRC(SSRC ssrc) { this->ssrc = htonl(ssrc); }
+	SSRC inline getSSRC() const { return ntohl(ssrc); }
+
+	void inline setPacketsLost([[maybe_unused]] unsigned int packetsLost,
+	                           [[maybe_unused]] unsigned int totalPackets) {
+		// TODO Implement loss percentages.
+		_fractionLostAndPacketsLost = 0;
+	}
+	unsigned int inline getLossPercentage() const {
+		// TODO Implement loss percentages.
+		return 0;
+	}
+	unsigned int inline getPacketLostCount() const {
+		// TODO Implement total packets lost.
+		return 0;
+	}
+
+	uint16_t inline seqNoCycles() const { return ntohs(_seqNoCycles); }
+	uint16_t inline highestSeqNo() const { return ntohs(_highestSeqNo); }
+	uint32_t inline jitter() const { return ntohl(_jitter); }
+
+	void inline setSeqNo(uint16_t highestSeqNo, uint16_t seqNoCycles) {
+		_highestSeqNo = htons(highestSeqNo);
+		_seqNoCycles = htons(seqNoCycles);
+	}
+
+	void inline setJitter(uint32_t jitter) { _jitter = htonl(jitter); }
+
+	void inline setNTPOfSR(uint32_t ntp) { _lastReport = htonl(ntp >> 16u); }
+	inline uint32_t getNTPOfSR() const { return ntohl(_lastReport) << 16u; }
+
+	inline void setDelaySinceSR(uint32_t sr) {
+		// The delay, expressed in units of 1/65536 seconds
+		_delaySinceLastReport = htonl(sr);
+	}
+	inline uint32_t getDelaySinceSR() const { return ntohl(_delaySinceLastReport); }
+};
+
+struct RTCP_HEADER {
+private:
+	uint8_t _first;
+	uint8_t _payloadType;
+	uint16_t _length;
+
+public:
+	inline uint8_t version() const { return _first >> 6; }
+	inline bool padding() const { return (_first >> 5) & 0x01; }
+	inline uint8_t reportCount() const { return _first & 0x0F; }
+	inline uint8_t payloadType() const { return _payloadType; }
+	inline uint16_t length() const { return ntohs(_length); }
+
+	inline void setPayloadType(uint8_t type) { _payloadType = type; }
+	inline void setReportCount(uint8_t count) { _first = (_first & 0xF0) | (count & 0x0F); }
+	inline void setLength(uint16_t length) { _length = htons(length); }
+
+	void prepareHeader(uint8_t payloadType, uint8_t reportCount, uint16_t length) {
+		_first = 0x02 << 6; // version 2, no padding
+		setReportCount(reportCount);
+		setPayloadType(payloadType);
+		setLength(length);
+	}
+
+	void print() {
+		std::cout << "version:" << unsigned(version()) << " padding:" << (padding() ? "T" : "F")
+		          << " reportCount: " << unsigned(reportCount())
+		          << " payloadType:" << unsigned(payloadType()) << " length: " << length();
+	}
+};
+
+struct RTCP_SR {
+	RTCP_HEADER header;
+	SSRC senderSSRC;
+
+private:
+	uint64_t _ntpTimestamp;
+	uint32_t _rtpTimestamp;
+	uint32_t _packetCount;
+	uint32_t _octetCount;
+
+	RTCP_ReportBlock _reportBlocks;
+
+public:
+	void print() {
+		std::cout << "SR ";
+		header.print();
+		std::cout << " SSRC:" << ntohl(senderSSRC) << " NTP TS: " << ntpTimestamp()
+		          << " RTP TS: " << rtpTimestamp() << " packetCount: " << packetCount()
+		          << " octetCount: " << octetCount() << std::endl;
+
+		for (unsigned i = 0; i < unsigned(header.reportCount()); i++) {
+			getReportBlock(i)->print();
+			std::cout << std::endl;
+		}
+	}
+
+	inline void preparePacket(SSRC senderSSRC, uint8_t reportCount) {
+		unsigned int length =
+		    ((sizeof(header) + 24 + reportCount * sizeof(RTCP_ReportBlock)) / 4) - 1;
+		header.prepareHeader(200, reportCount, uint16_t(length));
+		this->senderSSRC = htonl(senderSSRC);
+	}
+
+	RTCP_ReportBlock *getReportBlock(int num) { return &_reportBlocks + num; }
+
+	[[nodiscard]] size_t getSize() const {
+		// "length" in packet is one less than the number of 32 bit words in the packet.
+		return sizeof(uint32_t) * (1 + size_t(header.length()));
+	}
+
+	inline uint32_t ntpTimestamp() const { return ntohll(_ntpTimestamp); }
+	inline uint32_t rtpTimestamp() const { return ntohl(_rtpTimestamp); }
+	inline uint32_t packetCount() const { return ntohl(_packetCount); }
+	inline uint32_t octetCount() const { return ntohl(_octetCount); }
+
+	inline void setNtpTimestamp(uint32_t ts) { _ntpTimestamp = htonll(ts); }
+	inline void setRtpTimestamp(uint32_t ts) { _rtpTimestamp = htonl(ts); }
+};
+
+struct RTCP_RR {
+	RTCP_HEADER header;
+	SSRC senderSSRC;
+
+private:
+	RTCP_ReportBlock _reportBlocks;
+
+public:
+	void print() {
+		std::cout << "RR ";
+		header.print();
+		std::cout << " SSRC:" << ntohl(senderSSRC) << std::endl;
+
+		for (unsigned i = 0; i < unsigned(header.reportCount()); i++) {
+			getReportBlock(i)->print();
+			std::cout << std::endl;
+		}
+	}
+	RTCP_ReportBlock *getReportBlock(int num) { return &_reportBlocks + num; }
+
+	inline SSRC getSenderSSRC() const { return ntohl(senderSSRC); }
+	inline void setSenderSSRC(SSRC ssrc) { this->senderSSRC = htonl(ssrc); }
+
+	[[nodiscard]] inline size_t getSize() const {
+		// "length" in packet is one less than the number of 32 bit words in the packet.
+		return sizeof(uint32_t) * (1 + size_t(header.length()));
+	}
+
+	inline void preparePacket(SSRC senderSSRC, uint8_t reportCount) {
+		// "length" in packet is one less than the number of 32 bit words in the packet.
+		size_t length = (sizeWithReportBlocks(reportCount) / 4) - 1;
+		header.prepareHeader(201, reportCount, uint16_t(length));
+		this->senderSSRC = htonl(senderSSRC);
+	}
+
+	inline static size_t sizeWithReportBlocks(uint8_t reportCount) {
+		return sizeof(header) + 4 + size_t(reportCount) * sizeof(RTCP_ReportBlock);
+	}
+};
+
+struct RTP
+{
+private:
+	uint8_t _first;
+	uint8_t _payloadType;
+	uint16_t _seqNumber;
+	uint32_t _timestamp;
+
+public:
+	SSRC ssrc;
+	SSRC csrc[16];
+
+	inline uint8_t version() const { return _first >> 6; }
+	inline bool padding() const { return (_first >> 5) & 0x01; }
+	inline uint8_t csrcCount() const { return _first & 0x0F; }
+	inline uint8_t payloadType() const { return _payloadType; }
+	inline uint16_t seqNumber() const { return ntohs(_seqNumber); }
+	inline uint32_t timestamp() const { return ntohl(_timestamp); }
+};
+
+struct RTCP_REMB {
+	RTCP_HEADER header;
+
+	SSRC senderSSRC;
+	SSRC mediaSourceSSRC;
+
+	// Unique identifier
+	const char id[4] = {'R', 'E', 'M', 'B'};
+
+	// Num SSRC, Br Exp, Br Mantissa (bit mask)
+	uint32_t bitrate;
+
+	SSRC ssrc[1];
+
+	[[nodiscard]] size_t getSize() const {
+		// "length" in packet is one less than the number of 32 bit words in the packet.
+		return sizeof(uint32_t) * (1 + size_t(header.length()));
+	}
+
+	void preparePacket(SSRC senderSSRC, unsigned int numSSRC, unsigned int bitrate) {
+		// Report Count becomes the format here.
+		header.prepareHeader(206, 15, 0);
+
+		// Always zero.
+		mediaSourceSSRC = 0;
+
+		this->senderSSRC = htonl(senderSSRC);
+		setBitrate(numSSRC, bitrate);
+	}
+
+	void setBitrate(unsigned int numSSRC, unsigned int bitrate) {
+		unsigned int exp = 0;
+		while (bitrate > pow(2, 18) - 1) {
+			exp++;
+			bitrate /= 2;
+		}
+
+		// "length" in packet is one less than the number of 32 bit words in the packet.
+		header.setLength(uint16_t((offsetof(RTCP_REMB, ssrc) / 4) - 1 + numSSRC));
+
+		this->bitrate = htonl((numSSRC << (32u - 8u)) | (exp << (32u - 8u - 6u)) | bitrate);
+	}
+
+	// TODO Make this work
+	//	  uint64_t getBitrate() const{
+	//		  uint32_t ntohed = ntohl(this->bitrate);
+	//		  uint64_t bitrate = ntohed & (unsigned int)(pow(2, 18)-1);
+	//		  unsigned int exp = ntohed & ((unsigned int)( (pow(2, 6)-1)) << (32u-8u-6u));
+	//		  return bitrate * pow(2,exp);
+	//	  }
+	//
+	//	  uint8_t getNumSSRCS() const {
+	//		  return ntohl(this->bitrate) & (((unsigned int) pow(2,8)-1) << (32u-8u));
+	//	  }
+
+	void print() {
+		std::cout << "REMB ";
+		header.print();
+		std::cout << " SSRC:" << ntohl(senderSSRC);
+	}
+
+	void setSSRC(uint8_t iterator, SSRC ssrc) { this->ssrc[iterator] = htonl(ssrc); }
+
+	static unsigned int sizeWithSSRCs(int numSSRC) {
+		return (offsetof(RTCP_REMB, ssrc)) + sizeof(SSRC) * numSSRC;
+	}
+};
+
+#pragma pack(pop)
+
+class RtcpHandler {
+public:
+	virtual void onOutgoing(std::function<void(rtc::message_ptr)> cb) = 0;
+	virtual std::optional<rtc::message_ptr> incoming(rtc::message_ptr ptr) = 0;
+};
+
+class RtcpSession : public RtcpHandler {
+private:
+	std::function<void(RTP)> onPacketCB;
+	unsigned int requestedBitrate = 0;
+	synchronized_callback<rtc::message_ptr> txCB;
+	SSRC ssrc = 0;
+	uint32_t greatestSeqNo = 0;
+	uint64_t syncRTPTS, syncNTPTS;
+
+public:
+	void onOutgoing(std::function<void(rtc::message_ptr)> cb) override { txCB = cb; }
+
+	std::optional<rtc::message_ptr> incoming(rtc::message_ptr ptr) override {
+		if (ptr->type == rtc::Message::Type::Binary) {
+			RTP *rtp = (RTP *)ptr->data();
+
+			// https://tools.ietf.org/html/rfc3550#appendix-A.1
+			if (rtp->version() != 2) {
+				PLOG_WARNING << "RTP packet is not version 2";
+
+				return std::nullopt;
+			}
+			if (rtp->payloadType() == 201 || rtp->payloadType() == 200) {
+				PLOG_WARNING << "RTP packet has a payload type indicating RR/SR";
+
+				return std::nullopt;
+			}
+
+			// TODO Implement the padding bit
+			if (rtp->padding()) {
+				PLOG_WARNING << "Padding processing not implemented";
+			}
+
+			ssrc = ntohl(rtp->ssrc);
+
+			uint32_t seqNo = rtp->seqNumber();
+			// uint32_t rtpTS = rtp->getTS();
+
+			if (greatestSeqNo < seqNo)
+				greatestSeqNo = seqNo;
+
+			return ptr;
+		}
+
+		assert(ptr->type == rtc::Message::Type::Control);
+		auto rr = (RTCP_RR *)ptr->data();
+		if (rr->header.payloadType() == 201) {
+			// RR
+			ssrc = rr->getSenderSSRC();
+			rr->print();
+			std::cout << std::endl;
+		} else if (rr->header.payloadType() == 200) {
+			// SR
+			ssrc = rr->getSenderSSRC();
+			auto sr = (RTCP_SR *)ptr->data();
+			syncRTPTS = sr->rtpTimestamp();
+			syncNTPTS = sr->ntpTimestamp();
+			sr->print();
+			std::cout << std::endl;
+
+			// TODO For the time being, we will send RR's/REMB's when we get an SR
+			pushRR(0);
+			if (requestedBitrate > 0)
+				pushREMB(requestedBitrate);
+		}
+		return std::nullopt;
+	}
+
+	void requestBitrate(unsigned int newBitrate) {
+		this->requestedBitrate = newBitrate;
+
+		PLOG_DEBUG << "[GOOG-REMB] Requesting bitrate: " << newBitrate << std::endl;
+		pushREMB(newBitrate);
+	}
+
+private:
+	void pushREMB(unsigned int bitrate) {
+		rtc::message_ptr msg =
+		    rtc::make_message(RTCP_REMB::sizeWithSSRCs(1), rtc::Message::Type::Control);
+		auto remb = (RTCP_REMB *)msg->data();
+		remb->preparePacket(ssrc, 1, bitrate);
+		remb->setSSRC(0, ssrc);
+		remb->print();
+		std::cout << std::endl;
+
+		tx(msg);
+	}
+
+	void pushRR(unsigned int lastSR_delay) {
+		// std::cout << "size " << RTCP_RR::sizeWithReportBlocks(1) << std::endl;
+		auto msg = rtc::make_message(RTCP_RR::sizeWithReportBlocks(1), rtc::Message::Type::Control);
+		auto rr = (RTCP_RR *)msg->data();
+		rr->preparePacket(ssrc, 1);
+		rr->getReportBlock(0)->preparePacket(ssrc, 0, 0, greatestSeqNo, 0, 0, syncNTPTS,
+		                                     lastSR_delay);
+		rr->print();
+		std::cout << std::endl;
+
+		tx(msg);
+	}
+
+	void tx(message_ptr msg) {
+		try {
+			txCB(msg);
+		} catch (const std::exception &e) {
+			LOG_DEBUG << "RTCP tx failed: " << e.what();
+		}
+	}
+};
+
+} // namespace rtc
+
+#endif // RTC_RTPL_H

+ 82 - 0
include/rtc/track.hpp

@@ -0,0 +1,82 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_TRACK_H
+#define RTC_TRACK_H
+
+#include "channel.hpp"
+#include "description.hpp"
+#include "include.hpp"
+#include "message.hpp"
+#include "queue.hpp"
+#include "rtp.hpp"
+
+#include <atomic>
+#include <variant>
+
+namespace rtc {
+
+#if RTC_ENABLE_MEDIA
+class DtlsSrtpTransport;
+#endif
+
+class Track final : public std::enable_shared_from_this<Track>, public Channel {
+public:
+	Track(Description::Media description);
+	~Track() = default;
+
+	string mid() const;
+	Description::Media description() const;
+
+	void close(void) override;
+	bool send(message_variant data) override;
+	bool send(const byte *data, size_t size);
+
+	bool isOpen(void) const override;
+	bool isClosed(void) const override;
+	size_t maxMessageSize() const override;
+
+	// Extended API
+	size_t availableAmount() const override;
+	std::optional<message_variant> receive() override;
+
+	// RTCP handler
+	void setRtcpHandler(std::shared_ptr<RtcpHandler> handler);
+
+private:
+#if RTC_ENABLE_MEDIA
+	void open(std::shared_ptr<DtlsSrtpTransport> transport);
+	std::weak_ptr<DtlsSrtpTransport> mDtlsSrtpTransport;
+#endif
+
+	bool outgoing(message_ptr message);
+	void incoming(message_ptr message);
+
+	Description::Media mMediaDescription;
+	std::atomic<bool> mIsClosed = false;
+
+	Queue<message_ptr> mRecvQueue;
+	std::shared_ptr<RtcpHandler> mRtcpHandler;
+
+	friend class PeerConnection;
+};
+
+} // namespace rtc
+
+#endif
+

+ 6 - 8
src/datachannel.cpp

@@ -72,8 +72,6 @@ struct CloseMessage {
 };
 #pragma pack(pop)
 
-const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // 1 MiB
-
 DataChannel::DataChannel(weak_ptr<PeerConnection> pc, unsigned int stream, string label,
                          string protocol, Reliability reliability)
     : mPeerConnection(pc), mStream(stream), mLabel(std::move(label)),
@@ -130,10 +128,9 @@ std::optional<message_variant> DataChannel::receive() {
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
 			if (!message->empty() && raw[0] == MESSAGE_CLOSE)
 				remoteClose();
-			continue;
+		} else {
+			return to_variant(std::move(*message));
 		}
-		if (auto variant = to_variant(std::move(*message)))
-			return variant;
 	}
 
 	return nullopt;
@@ -147,8 +144,9 @@ size_t DataChannel::maxMessageSize() const {
 	size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
 	if (auto pc = mPeerConnection.lock())
 		if (auto description = pc->remoteDescription())
-			if (auto maxMessageSize = description->maxMessageSize())
-				remoteMax = *maxMessageSize > 0 ? *maxMessageSize : LOCAL_MAX_MESSAGE_SIZE;
+			if (auto *application = description->application())
+				if (auto maxMessageSize = application->maxMessageSize())
+					remoteMax = *maxMessageSize > 0 ? *maxMessageSize : LOCAL_MAX_MESSAGE_SIZE;
 
 	return std::min(remoteMax, LOCAL_MAX_MESSAGE_SIZE);
 }
@@ -206,7 +204,7 @@ bool DataChannel::outgoing(message_ptr message) {
 
 	auto transport = mSctpTransport.lock();
 	if (!transport)
-		throw std::runtime_error("DataChannel has no transport");
+		throw std::runtime_error("DataChannel transport is not open");
 
 	// Before the ACK has been received on a DataChannel, all messages must be sent ordered
 	message->reliability = mIsOpen ? mReliability : nullptr;

+ 471 - 128
src/description.cpp

@@ -1,5 +1,6 @@
 /**
- * Copyright (c) 2019 Paul-Louis Ageneau
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
+ * Copyright (c) 2020 Staz M
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -21,16 +22,19 @@
 #include <algorithm>
 #include <cctype>
 #include <chrono>
+#include <iostream>
 #include <random>
 #include <sstream>
 
+using std::shared_ptr;
 using std::size_t;
 using std::string;
+using std::string_view;
 using std::chrono::system_clock;
 
 namespace {
 
-inline bool match_prefix(const string &str, const string &prefix) {
+inline bool match_prefix(string_view str, string_view prefix) {
 	return str.size() >= prefix.size() &&
 	       std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end();
 }
@@ -41,6 +45,21 @@ inline void trim_end(string &str) {
 	    str.end());
 }
 
+inline std::pair<string_view, string_view> parse_pair(string_view attr) {
+	string_view key, value;
+	if (size_t separator = attr.find(':'); separator != string::npos) {
+		key = attr.substr(0, separator);
+		value = attr.substr(separator + 1);
+	} else {
+		key = attr;
+	}
+	return std::make_pair(std::move(key), std::move(value));
+}
+
+template <typename T> T to_integer(string_view s) {
+	return std::is_signed<T>::value ? T(std::stol(string(s))) : T(std::stoul(string(s)));
+}
+
 } // namespace
 
 namespace rtc {
@@ -52,7 +71,6 @@ Description::Description(const string &sdp, Type type) : Description(sdp, type,
 
 Description::Description(const string &sdp, Type type, Role role)
     : mType(Type::Unspec), mRole(role) {
-	mData.mid = "data";
 	hintType(type);
 
 	auto seed = static_cast<unsigned int>(system_clock::now().time_since_epoch().count());
@@ -61,50 +79,25 @@ Description::Description(const string &sdp, Type type, Role role)
 	mSessionId = std::to_string(uniform(generator));
 
 	std::istringstream ss(sdp);
-	std::optional<Media> currentMedia;
+	std::shared_ptr<Entry> current;
 
-	int mlineIndex = 0;
-	bool finished;
-	do {
-		string line;
-		finished = !std::getline(ss, line) && line.empty();
+	int index = -1;
+	string line;
+	while (std::getline(ss, line) || !line.empty()) {
 		trim_end(line);
 
 		// Media description line (aka m-line)
-		if (finished || match_prefix(line, "m=")) {
-			if (currentMedia) {
-				if (!currentMedia->mid.empty()) {
-					if (currentMedia->type == "application")
-						mData.mid = currentMedia->mid;
-					else
-						mMedia.emplace(mlineIndex, std::move(*currentMedia));
-
-					++mlineIndex;
-
-				} else if (line.find(" ICE/SDP") != string::npos) {
-					PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring";
-				}
-			}
-			if (!finished)
-				currentMedia.emplace(Media(line.substr(2)));
+		if (match_prefix(line, "m=")) {
+			++index;
+			string mline = line.substr(2);
+			current = createEntry(std::move(mline), std::to_string(index), Direction::Unknown);
 
 			// Attribute line
 		} else if (match_prefix(line, "a=")) {
 			string attr = line.substr(2);
+			auto [key, value] = parse_pair(attr);
 
-			string key, value;
-			if (size_t separator = attr.find(':'); separator != string::npos) {
-				key = attr.substr(0, separator);
-				value = attr.substr(separator + 1);
-			} else {
-				key = attr;
-			}
-
-			if (key == "mid") {
-				if (currentMedia)
-					currentMedia->mid = value;
-
-			} else if (key == "setup") {
+			if (key == "setup") {
 				if (value == "active")
 					mRole = Role::Active;
 				else if (value == "passive")
@@ -125,19 +118,18 @@ Description::Description(const string &sdp, Type type, Role role)
 				mIceUfrag = value;
 			} else if (key == "ice-pwd") {
 				mIcePwd = value;
-			} else if (key == "sctp-port") {
-				mData.sctpPort = uint16_t(std::stoul(value));
-			} else if (key == "max-message-size") {
-				mData.maxMessageSize = size_t(std::stoul(value));
 			} else if (key == "candidate") {
-				addCandidate(Candidate(attr, currentMedia ? currentMedia->mid : mData.mid));
+				addCandidate(Candidate(attr, bundleMid()));
 			} else if (key == "end-of-candidates") {
 				mEnded = true;
-			} else if (currentMedia) {
-				currentMedia->attributes.emplace_back(line.substr(2));
+			} else if (current) {
+				current->parseSdpLine(std::move(line));
 			}
+
+		} else if (current) {
+			current->parseSdpLine(std::move(line));
 		}
-	} while (!finished);
+	};
 }
 
 Description::Type Description::type() const { return mType; }
@@ -148,22 +140,13 @@ Description::Role Description::role() const { return mRole; }
 
 string Description::roleString() const { return roleToString(mRole); }
 
-string Description::dataMid() const { return mData.mid; }
-
 string Description::bundleMid() const {
 	// Get the mid of the first media
-	if (auto it = mMedia.find(0); it != mMedia.end())
-		return it->second.mid;
-	else
-		return mData.mid;
+	return !mEntries.empty() ? mEntries[0]->mid() : "0";
 }
 
 std::optional<string> Description::fingerprint() const { return mFingerprint; }
 
-std::optional<uint16_t> Description::sctpPort() const { return mData.sctpPort; }
-
-std::optional<size_t> Description::maxMessageSize() const { return mData.maxMessageSize; }
-
 bool Description::ended() const { return mEnded; }
 
 void Description::hintType(Type type) {
@@ -174,16 +157,10 @@ void Description::hintType(Type type) {
 	}
 }
 
-void Description::setDataMid(string mid) { mData.mid = mid; }
-
 void Description::setFingerprint(string fingerprint) {
 	mFingerprint.emplace(std::move(fingerprint));
 }
 
-void Description::setSctpPort(uint16_t port) { mData.sctpPort.emplace(port); }
-
-void Description::setMaxMessageSize(size_t size) { mData.maxMessageSize.emplace(size); }
-
 void Description::addCandidate(Candidate candidate) {
 	mCandidates.emplace_back(std::move(candidate));
 }
@@ -197,16 +174,9 @@ std::vector<Candidate> Description::extractCandidates() {
 	return result;
 }
 
-bool Description::hasMedia() const { return !mMedia.empty(); }
-
-void Description::addMedia(const Description &source) {
-	for (auto p : source.mMedia)
-		mMedia.emplace(p);
-}
-
 Description::operator string() const { return generateSdp("\r\n"); }
 
-string Description::generateSdp(const string &eol) const {
+string Description::generateSdp(string_view eol) const {
 	std::ostringstream sdp;
 
 	// Header
@@ -219,21 +189,18 @@ string Description::generateSdp(const string &eol) const {
 	// see Negotiating Media Multiplexing Using the Session Description Protocol
 	// https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54
 	sdp << "a=group:BUNDLE";
-	for (int i = 0; i < int(mMedia.size() + 1); ++i)
-		if (auto it = mMedia.find(i); it != mMedia.end())
-			sdp << ' ' << it->second.mid;
-		else
-			sdp << ' ' << mData.mid;
+	for (const auto &entry : mEntries)
+		sdp << ' ' << entry->mid();
 	sdp << eol;
 
-	// Non-data media
-	if (!mMedia.empty()) {
-		// Lip-sync
-		sdp << "a=group:LS";
-		for (const auto &p : mMedia)
-			sdp << " " << p.second.mid;
-		sdp << eol;
-	}
+	// Lip-sync
+	std::ostringstream lsGroup;
+	for (const auto &entry : mEntries)
+		if (entry != mApplication)
+			lsGroup << ' ' << entry->mid();
+
+	if (!lsGroup.str().empty())
+		sdp << "a=group:LS" << lsGroup.str() << eol;
 
 	// Session-level attributes
 	sdp << "a=msid-semantic:WMS *" << eol;
@@ -247,45 +214,25 @@ string Description::generateSdp(const string &eol) const {
 	if (mFingerprint)
 		sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;
 
-	// Media descriptions and attributes
-	for (int i = 0; i < int(mMedia.size() + 1); ++i) {
-		if (auto it = mMedia.find(i); it != mMedia.end()) {
-			// Non-data media
-			const auto &media = it->second;
-			sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
-			sdp << "c=IN IP4 0.0.0.0" << eol;
-			sdp << "a=bundle-only" << eol;
-			sdp << "a=mid:" << media.mid << eol;
-			for (const auto &attr : media.attributes)
-				sdp << "a=" << attr << eol;
+	// Entries
+	bool first = true;
+	for (const auto &entry : mEntries) {
+		sdp << entry->generateSdp(eol);
 
-		} else {
-			// Data
-			const string description = "UDP/DTLS/SCTP webrtc-datachannel";
-			sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << description << eol;
-			sdp << "c=IN IP4 0.0.0.0" << eol;
-			if (!mMedia.empty())
-				sdp << "a=bundle-only" << eol;
-			sdp << "a=mid:" << mData.mid << eol;
-			sdp << "a=sendrecv" << eol;
-			if (mData.sctpPort)
-				sdp << "a=sctp-port:" << *mData.sctpPort << eol;
-			if (mData.maxMessageSize)
-				sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
+		if (std::exchange(first, false)) {
+			// Candidates
+			for (const auto &candidate : mCandidates)
+				sdp << string(candidate) << eol;
+
+			if (mEnded)
+				sdp << "a=end-of-candidates" << eol;
 		}
 	}
 
-	// Candidates
-	for (const auto &candidate : mCandidates)
-		sdp << string(candidate) << eol;
-
-	if (mEnded)
-		sdp << "a=end-of-candidates" << eol;
-
 	return sdp.str();
 }
 
-string Description::generateDataSdp(const string &eol) const {
+string Description::generateApplicationSdp(string_view eol) const {
 	std::ostringstream sdp;
 
 	// Header
@@ -294,16 +241,12 @@ string Description::generateDataSdp(const string &eol) const {
 	sdp << "s=-" << eol;
 	sdp << "t=0 0" << eol;
 
-	// Data
-	sdp << "m=application 9 UDP/DTLS/SCTP webrtc-datachannel";
-	sdp << "c=IN IP4 0.0.0.0" << eol;
-	sdp << "a=mid:" << mData.mid << eol;
-	sdp << "a=sendrecv" << eol;
-	if (mData.sctpPort)
-		sdp << "a=sctp-port:" << *mData.sctpPort << eol;
-	if (mData.maxMessageSize)
-		sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
+	// Application
+	auto app = mApplication ? mApplication : std::make_shared<Application>();
+	sdp << app->generateSdp(eol);
 
+	// Session-level attributes
+	sdp << "a=msid-semantic:WMS *" << eol;
 	sdp << "a=setup:" << roleToString(mRole) << eol;
 	sdp << "a=ice-ufrag:" << mIceUfrag << eol;
 	sdp << "a=ice-pwd:" << mIcePwd << eol;
@@ -324,14 +267,415 @@ string Description::generateDataSdp(const string &eol) const {
 	return sdp.str();
 }
 
-Description::Media::Media(const string &mline) {
+shared_ptr<Description::Entry> Description::createEntry(string mline, string mid, Direction dir) {
+	string type = mline.substr(0, mline.find(' '));
+	if (type == "application") {
+		removeApplication();
+		mApplication = std::make_shared<Application>(std::move(mid));
+		mEntries.emplace_back(mApplication);
+		return mApplication;
+	} else {
+		auto media = std::make_shared<Media>(std::move(mline), std::move(mid), dir);
+		mEntries.emplace_back(media);
+		return media;
+	}
+}
+
+void Description::removeApplication() {
+	if (!mApplication)
+		return;
+
+	auto it = std::find(mEntries.begin(), mEntries.end(), mApplication);
+	if (it != mEntries.end())
+		mEntries.erase(it);
+
+	mApplication.reset();
+}
+
+bool Description::hasApplication() const { return mApplication != nullptr; }
+
+bool Description::hasAudioOrVideo() const {
+	for (auto entry : mEntries)
+		if (entry != mApplication)
+			return true;
+
+	return false;
+}
+
+int Description::addMedia(Media media) {
+	mEntries.emplace_back(std::make_shared<Media>(std::move(media)));
+	return int(mEntries.size()) - 1;
+}
+
+int Description::addMedia(Application application) {
+	removeApplication();
+	mApplication = std::make_shared<Application>(std::move(application));
+	mEntries.emplace_back(mApplication);
+	return int(mEntries.size()) - 1;
+}
+
+int Description::addApplication(string mid) { return addMedia(Application(std::move(mid))); }
+
+Description::Application *Description::application() { return mApplication.get(); }
+
+int Description::addVideo(string mid, Direction dir) {
+	return addMedia(Video(std::move(mid), dir));
+}
+
+int Description::addAudio(string mid, Direction dir) {
+	return addMedia(Audio(std::move(mid), dir));
+}
+
+std::variant<Description::Media *, Description::Application *> Description::media(int index) {
+	if (index < 0 || index >= int(mEntries.size()))
+		throw std::out_of_range("Media index out of range");
+
+	const auto &entry = mEntries[index];
+	if (entry == mApplication) {
+		auto result = dynamic_cast<Application *>(entry.get());
+		if (!result)
+			throw std::logic_error("Bad type of application in description");
+		return result;
+	} else {
+		auto result = dynamic_cast<Media *>(entry.get());
+		if (!result)
+			throw std::logic_error("Bad type of media in description");
+		return result;
+	}
+}
+
+std::variant<const Description::Media *, const Description::Application *>
+Description::media(int index) const {
+	if (index < 0 || index >= int(mEntries.size()))
+		throw std::out_of_range("Media index out of range");
+
+	const auto &entry = mEntries[index];
+	if (entry == mApplication) {
+		auto result = dynamic_cast<Application *>(entry.get());
+		if (!result)
+			throw std::logic_error("Bad type of application in description");
+		return result;
+	} else {
+		auto result = dynamic_cast<Media *>(entry.get());
+		if (!result)
+			throw std::logic_error("Bad type of media in description");
+		return result;
+	}
+}
+
+int Description::mediaCount() const { return int(mEntries.size()); }
+
+Description::Entry::Entry(string mline, string mid, Direction dir)
+    : mDirection(dir), mMid(std::move(mid)) {
 	size_t p = mline.find(' ');
-	this->type = mline.substr(0, p);
+	mType = mline.substr(0, p);
 	if (p != string::npos)
 		if (size_t q = mline.find(' ', p + 1); q != string::npos)
-			this->description = mline.substr(q + 1);
+			mDescription = mline.substr(q + 1, mline.find(' ', q + 1) - (q + 1));
+}
+
+string Description::Entry::generateSdp(string_view eol) const {
+	std::ostringstream sdp;
+	sdp << "m=" << type() << ' ' << 0 << ' ' << description() << eol;
+	sdp << "c=IN IP4 0.0.0.0" << eol;
+	sdp << "a=bundle-only" << eol;
+	sdp << "a=mid:" << mMid << eol;
+
+	switch (mDirection) {
+	case Direction::RecvOnly:
+		sdp << "a=recvonly" << eol;
+		break;
+	case Direction::SendOnly:
+		sdp << "a=sendonly" << eol;
+		break;
+	case Direction::SendRecv:
+		sdp << "a=sendrecv" << eol;
+		break;
+	default:
+		// Ignore
+		break;
+	}
+
+	for (const auto &attr : mAttributes)
+		sdp << "a=" << attr << eol;
+
+	return sdp.str();
+}
+
+void Description::Entry::parseSdpLine(string_view line) {
+	if (match_prefix(line, "a=")) {
+		string_view attr = line.substr(2);
+		auto [key, value] = parse_pair(attr);
+
+		if (key == "mid")
+			mMid = value;
+		else if (key == "sendrecv")
+			mDirection = Direction::SendRecv;
+		else if (attr == "recvonly")
+			mDirection = Direction::RecvOnly;
+		else if (attr == "sendonly")
+			mDirection = Direction::SendOnly;
+		else
+			mAttributes.emplace_back(line.substr(2));
+	}
+}
+
+Description::Application::Application(string mid)
+    : Entry("application 9 UDP/DTLS/SCTP", std::move(mid), Direction::SendRecv) {}
+
+string Description::Application::description() const {
+	return Entry::description() + " webrtc-datachannel";
+}
+
+Description::Application Description::Application::reciprocate() const {
+	Application reciprocated(*this);
+
+	reciprocated.mMaxMessageSize.reset();
+
+	return reciprocated;
+}
+
+string Description::Application::generateSdp(string_view eol) const {
+	std::ostringstream sdp;
+	sdp << Entry::generateSdp(eol);
+
+	if (mSctpPort)
+		sdp << "a=sctp-port:" << *mSctpPort << eol;
+
+	if (mMaxMessageSize)
+		sdp << "a=max-message-size:" << *mMaxMessageSize << eol;
+
+	return sdp.str();
+}
+
+void Description::Application::parseSdpLine(string_view line) {
+	if (match_prefix(line, "a=")) {
+		string_view attr = line.substr(2);
+		auto [key, value] = parse_pair(attr);
+
+		if (key == "sctp-port") {
+			mSctpPort = to_integer<uint16_t>(value);
+		} else if (key == "max-message-size") {
+			mMaxMessageSize = to_integer<size_t>(value);
+		} else {
+			Entry::parseSdpLine(line);
+		}
+	} else {
+		Entry::parseSdpLine(line);
+	}
+}
+
+Description::Media::Media(string mline, string mid, Direction dir)
+    : Entry(std::move(mline), std::move(mid), dir) {
+	mAttributes.emplace_back("rtcp-mux");
+	mAttributes.emplace_back("rtcp-mux-only");
+}
+
+string Description::Media::description() const {
+	std::ostringstream desc;
+	desc << Entry::description();
+	for (auto it = mRtpMap.begin(); it != mRtpMap.end(); ++it)
+		desc << ' ' << it->first;
+
+	return desc.str();
+}
+
+Description::Media Description::Media::reciprocate() const {
+	Media reciprocated(*this);
+
+	// Invert direction
+	switch (reciprocated.mDirection) {
+	case Direction::RecvOnly:
+		reciprocated.mDirection = Direction::SendOnly;
+		break;
+	case Direction::SendOnly:
+		reciprocated.mDirection = Direction::RecvOnly;
+		break;
+	default:
+		// We are good
+		break;
+	}
+
+	return reciprocated;
+}
+
+Description::Media::RTPMap &Description::Media::getFormat(int fmt) {
+	auto it = mRtpMap.find(fmt);
+	if (it != mRtpMap.end())
+		return it->second;
+
+	throw std::invalid_argument("mLineIndex is out of bounds");
+}
+
+Description::Media::RTPMap &Description::Media::getFormat(const string &fmt) {
+	for (auto it = mRtpMap.begin(); it != mRtpMap.end(); ++it)
+		if (it->second.format == fmt)
+			return it->second;
+
+	throw std::invalid_argument("format was not found");
+}
+
+void Description::Media::removeFormat(const string &fmt) {
+	auto it = mRtpMap.begin();
+	std::vector<int> remed;
+
+	// Remove the actual formats
+	while (it != mRtpMap.end()) {
+		if (it->second.format == fmt) {
+			remed.emplace_back(it->first);
+			it = mRtpMap.erase(it);
+		} else {
+			it++;
+		}
+	}
+
+	// Remove any other rtpmaps that depend on the formats we just removed
+	it = mRtpMap.begin();
+	while (it != mRtpMap.end()) {
+		auto it2 = it->second.fmtps.begin();
+		bool rem = false;
+		while (it2 != it->second.fmtps.end()) {
+			if (it2->find("apt=") == 0) {
+				for (auto remid : remed) {
+					if (it2->find(std::to_string(remid)) != string::npos) {
+						std::cout << *it2 << ' ' << remid << std::endl;
+						it = mRtpMap.erase(it);
+						rem = true;
+						break;
+					}
+				}
+				break;
+			}
+			it2++;
+		}
+		if (!rem)
+			it++;
+	}
+}
+
+void Description::Media::addVideoCodec(int payloadType, const string &codec) {
+	RTPMap map(std::to_string(payloadType) + ' ' + codec + "/90000");
+	map.addFB("nack");
+	map.addFB("goog-remb");
+	mRtpMap.emplace(map.pt, map);
 }
 
+void Description::Media::addH264Codec(int pt) { addVideoCodec(pt, "H264"); }
+
+void Description::Media::addVP8Codec(int payloadType) { addVideoCodec(payloadType, "VP8"); }
+
+void Description::Media::addVP9Codec(int payloadType) { addVideoCodec(payloadType, "VP9"); }
+
+void Description::Media::setBitrate(int bitrate) { mBas = bitrate; }
+
+int Description::Media::getBitrate() const { return mBas; }
+
+bool Description::Media::hasPayloadType(int payloadType) const {
+	return mRtpMap.find(payloadType) != mRtpMap.end();
+}
+
+string Description::Media::generateSdp(string_view eol) const {
+	std::ostringstream sdp;
+	sdp << Entry::generateSdp(eol);
+
+	if (mBas >= 0)
+		sdp << "b=AS:" << mBas << eol;
+
+	for (auto it = mRtpMap.begin(); it != mRtpMap.end(); ++it) {
+		auto &map = it->second;
+
+		// Create the a=rtpmap
+		sdp << "a=rtpmap:" << map.pt << ' ' << map.format << '/' << map.clockRate;
+		if (!map.encParams.empty())
+			sdp << '/' << map.encParams;
+		sdp << eol;
+
+		for (const auto &val : map.rtcpFbs)
+			sdp << "a=rtcp-fb:" << map.pt << ' ' << val << eol;
+		for (const auto &val : map.fmtps)
+			sdp << "a=fmtp:" << map.pt << ' ' << val << eol;
+	}
+
+	return sdp.str();
+}
+
+void Description::Media::parseSdpLine(string_view line) {
+	if (match_prefix(line, "a=")) {
+		string_view attr = line.substr(2);
+		auto [key, value] = parse_pair(attr);
+
+		if (key == "rtpmap") {
+			Description::Media::RTPMap map(value);
+			int pt = map.pt;
+			mRtpMap.emplace(pt, std::move(map));
+		} else if (key == "rtcp-fb") {
+			size_t p = value.find(' ');
+			int pt = to_integer<int>(value.substr(0, p));
+			auto it = mRtpMap.find(pt);
+			if (it == mRtpMap.end()) {
+				PLOG_WARNING << "rtcp-fb applied before its rtpmap. Ignoring";
+			} else {
+				it->second.rtcpFbs.emplace_back(value.substr(p + 1));
+			}
+		} else if (key == "fmtp") {
+			size_t p = value.find(' ');
+			int pt = to_integer<int>(value.substr(0, p));
+			auto it = mRtpMap.find(pt);
+			if (it == mRtpMap.end()) {
+				PLOG_WARNING << "fmtp applied before its rtpmap. Ignoring";
+			} else {
+				it->second.fmtps.emplace_back(value.substr(p + 1));
+			}
+		} else {
+			Entry::parseSdpLine(line);
+		}
+	} else if (match_prefix(line, "b=AS")) {
+		mBas = to_integer<int>(line.substr(line.find(':') + 1));
+	} else {
+		Entry::parseSdpLine(line);
+	}
+}
+
+Description::Media::RTPMap::RTPMap(string_view mline) {
+	size_t p = mline.find(' ');
+
+	this->pt = to_integer<int>(mline.substr(0, p));
+
+	string_view line = mline.substr(p + 1);
+	size_t spl = line.find('/');
+	this->format = line.substr(0, spl);
+
+	line = line.substr(spl + 1);
+	spl = line.find('/');
+	if (spl == string::npos) {
+		spl = line.find(' ');
+	}
+	if (spl == string::npos)
+		this->clockRate = to_integer<int>(line);
+	else {
+		this->clockRate = to_integer<int>(line.substr(0, spl));
+		this->encParams = line.substr(spl);
+	}
+}
+
+void Description::Media::RTPMap::removeFB(const string &string) {
+	auto it = rtcpFbs.begin();
+	while (it != rtcpFbs.end()) {
+		if (it->find(string) != std::string::npos) {
+			it = rtcpFbs.erase(it);
+		} else
+			it++;
+	}
+}
+
+void Description::Media::RTPMap::addFB(const string &string) { rtcpFbs.emplace_back(string); }
+
+Description::Audio::Audio(string mid, Direction dir)
+    : Media("audio 9 UDP/TLS/RTP/SAVPF", std::move(mid), dir) {}
+
+Description::Video::Video(string mid, Direction dir)
+    : Media("video 9 UDP/TLS/RTP/SAVPF", std::move(mid), dir) {}
+
 Description::Type Description::stringToType(const string &typeString) {
 	if (typeString == "offer")
 		return Type::Offer;
@@ -368,4 +712,3 @@ string Description::roleToString(Role role) {
 std::ostream &operator<<(std::ostream &out, const rtc::Description &description) {
 	return out << std::string(description);
 }
-

+ 11 - 4
src/dtlssrtptransport.cpp

@@ -84,9 +84,11 @@ bool DtlsSrtpTransport::sendMedia(message_ptr message) {
 
 	int size = message->size();
 	PLOG_VERBOSE << "Send size=" << size;
+//    return outgoing(message);
 
 	// The RTP header has a minimum size of 12 bytes
-	if (size < 12)
+	// An RTCP packet can have a minimum size of 8 bytes
+	if (size < 8)
 		throw std::runtime_error("RTP/RTCP packet too short");
 
 	// srtp_protect() and srtp_protect_rtcp() assume that they can write SRTP_MAX_TRAILER_LEN (for
@@ -124,8 +126,8 @@ bool DtlsSrtpTransport::sendMedia(message_ptr message) {
 	}
 
 	message->resize(size);
-	outgoing(message);
-	return true;
+	return outgoing(message);
+//	return DtlsTransport::send(message);
 }
 
 void DtlsSrtpTransport::incoming(message_ptr message) {
@@ -153,7 +155,8 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
 
 	} else if (value1 >= 128 && value1 <= 191) {
 		// The RTP header has a minimum size of 12 bytes
-		if (size < 12) {
+		// An RTCP packet can have a minimum size of 8 bytes
+		if (size < 8) {
 			PLOG_WARNING << "Incoming SRTP/SRTCP packet too short, size=" << size;
 			return;
 		}
@@ -175,6 +178,8 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
 				return;
 			}
 			PLOG_VERBOSE << "Unprotected SRTCP packet, size=" << size;
+			message->type = Message::Type::Control;
+			message->stream = to_integer<uint8_t>(*(message->begin() + 1)); // Payload Type
 		} else {
 			PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
 			if (srtp_err_status_t err = srtp_unprotect(mSrtpIn, message->data(), &size)) {
@@ -187,6 +192,8 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
 				return;
 			}
 			PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
+			message->type = Message::Type::Binary;
+			message->stream = value2; // Payload Type
 		}
 
 		message->resize(size);

+ 0 - 1
src/dtlstransport.cpp

@@ -149,7 +149,6 @@ void DtlsTransport::postHandshake() {
 
 void DtlsTransport::runRecvLoop() {
 	const size_t maxMtu = 4096;
-
 	// Handshake loop
 	try {
 		changeState(State::Connecting);

+ 4 - 3
src/icetransport.cpp

@@ -124,7 +124,8 @@ void IceTransport::setRemoteDescription(const Description &description) {
 	mRole = description.role() == Description::Role::Active ? Description::Role::Passive
 	                                                        : Description::Role::Active;
 	mMid = description.bundleMid();
-	if (juice_set_remote_description(mAgent.get(), description.generateDataSdp("\r\n").c_str()) < 0)
+	if (juice_set_remote_description(mAgent.get(),
+	                                 description.generateApplicationSdp("\r\n").c_str()) < 0)
 		throw std::runtime_error("Failed to parse ICE settings from remote SDP");
 }
 
@@ -487,8 +488,8 @@ void IceTransport::setRemoteDescription(const Description &description) {
 	mTrickleTimeout = !description.ended() ? 30s : 0s;
 
 	// Warning: libnice expects "\n" as end of line
-	if (nice_agent_parse_remote_sdp(mNiceAgent.get(), description.generateDataSdp("\n").c_str()) <
-	    0)
+	if (nice_agent_parse_remote_sdp(mNiceAgent.get(),
+	                                description.generateApplicationSdp("\n").c_str()) < 0)
 		throw std::runtime_error("Failed to parse ICE settings from remote SDP");
 }
 

+ 3 - 6
src/message.cpp

@@ -48,15 +48,12 @@ message_ptr make_message(message_variant data) {
 	    std::move(data));
 }
 
-std::optional<message_variant> to_variant(Message &&message) {
+message_variant to_variant(Message &&message) {
 	switch (message.type) {
 	case Message::String:
-		return std::make_optional(
-		    string(reinterpret_cast<const char *>(message.data()), message.size()));
-	case Message::Binary:
-		return std::make_optional(std::move(message));
+		return string(reinterpret_cast<const char *>(message.data()), message.size());
 	default:
-		return nullopt;
+		return std::move(message);
 	}
 }
 

+ 155 - 58
src/peerconnection.cpp

@@ -81,7 +81,7 @@ std::optional<Description> PeerConnection::remoteDescription() const {
 	return mRemoteDescription;
 }
 
-void PeerConnection::setLocalDescription(std::optional<Description> mediaDescription) {
+void PeerConnection::setLocalDescription() {
 	PLOG_VERBOSE << "Setting local description";
 
 	if (std::atomic_load(&mIceTransport))
@@ -92,16 +92,16 @@ void PeerConnection::setLocalDescription(std::optional<Description> mediaDescrip
 	// See https://tools.ietf.org/html/rfc5763#section-5
 	auto iceTransport = initIceTransport(Description::Role::ActPass);
 	Description localDescription = iceTransport->getLocalDescription(Description::Type::Offer);
-	if (mediaDescription)
-		localDescription.addMedia(*mediaDescription);
 	processLocalDescription(localDescription);
 	iceTransport->gatherLocalCandidates();
 }
 
-void PeerConnection::setRemoteDescription(Description description,
-                                          std::optional<Description> mediaDescription) {
+void PeerConnection::setRemoteDescription(Description description) {
 	PLOG_VERBOSE << "Setting remote description: " << string(description);
 
+	if (description.mediaCount() == 0)
+		throw std::runtime_error("Remote description has no media line");
+
 	if (!description.fingerprint())
 		throw std::runtime_error("Remote description is incomplete");
 
@@ -122,8 +122,6 @@ void PeerConnection::setRemoteDescription(Description description,
 	if (type == Description::Type::Offer) {
 		// This is an offer and we are the answerer.
 		Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
-		if (mediaDescription)
-			localDescription.addMedia(*mediaDescription);
 		processLocalDescription(localDescription);
 		iceTransport->gatherLocalCandidates();
 	} else {
@@ -185,9 +183,8 @@ std::optional<string> PeerConnection::remoteAddress() const {
 	return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
 }
 
-shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
-                                                          const string &protocol,
-                                                          const Reliability &reliability) {
+shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, string protocol,
+                                                          Reliability reliability) {
 	// RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
 	// setup:passive. [...] Thus, setup:active is RECOMMENDED.
 	// See https://tools.ietf.org/html/rfc5763#section-5
@@ -195,7 +192,8 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
 	auto iceTransport = std::atomic_load(&mIceTransport);
 	auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
 
-	auto channel = emplaceDataChannel(role, label, protocol, reliability);
+	auto channel =
+	    emplaceDataChannel(role, std::move(label), std::move(protocol), std::move(reliability));
 
 	if (!iceTransport) {
 		// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
@@ -235,33 +233,20 @@ void PeerConnection::onGatheringStateChange(std::function<void(GatheringState st
 
 bool PeerConnection::hasMedia() const {
 	auto local = localDescription();
-	auto remote = remoteDescription();
-	return (local && local->hasMedia()) || (remote && remote->hasMedia());
+	return local && local->hasAudioOrVideo();
 }
 
-void PeerConnection::sendMedia(binary packet) {
-	outgoingMedia(make_message(std::move(packet), Message::Binary));
-}
+std::shared_ptr<Track> PeerConnection::createTrack(Description::Media description) {
+	if (localDescription())
+		throw std::logic_error("Tracks must be created before local description");
 
-void PeerConnection::sendMedia(const byte *packet, size_t size) {
-	outgoingMedia(make_message(packet, packet + size, Message::Binary));
+	auto track = std::make_shared<Track>(std::move(description));
+	mTracks.emplace(std::make_pair(track->mid(), track));
+	return track;
 }
 
-void PeerConnection::onMedia(std::function<void(binary)> callback) { mMediaCallback = callback; }
-
-void PeerConnection::outgoingMedia([[maybe_unused]] message_ptr message) {
-	if (!hasMedia())
-		throw std::runtime_error("PeerConnection has no media support");
-
-#if RTC_ENABLE_MEDIA
-	auto transport = std::atomic_load(&mDtlsTransport);
-	if (!transport)
-		throw std::runtime_error("PeerConnection is not open");
-
-	std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->sendMedia(message);
-#else
-	PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
-#endif
+void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
+	mTrackCallback = callback;
 }
 
 shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
@@ -342,7 +327,11 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
 
 			switch (state) {
 			case DtlsTransport::State::Connected:
-				initSctpTransport();
+				if (auto local = localDescription())
+					if (local->hasApplication())
+						initSctpTransport();
+
+				openTracks();
 				break;
 			case DtlsTransport::State::Failed:
 				changeState(State::Failed);
@@ -396,7 +385,11 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
 		if (auto transport = std::atomic_load(&mSctpTransport))
 			return transport;
 
-		uint16_t sctpPort = remoteDescription()->sctpPort().value_or(DEFAULT_SCTP_PORT);
+		auto remote = remoteDescription();
+		if (!remote || !remote->application())
+			throw std::logic_error("Initializing SCTP transport without application description");
+
+		uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
 		auto lower = std::atomic_load(&mDtlsTransport);
 		auto transport = std::make_shared<SctpTransport>(
 		    lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
@@ -525,8 +518,56 @@ void PeerConnection::forwardMessage(message_ptr message) {
 }
 
 void PeerConnection::forwardMedia(message_ptr message) {
-	if (message)
-		mMediaCallback(std::move(*message));
+	if (!message)
+		return;
+
+	if (message->type == Message::Type::Control) {
+		std::shared_lock lock(mTracksMutex); // read-only
+		for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
+			if (auto track = it->second.lock())
+				return track->incoming(message);
+
+		PLOG_WARNING << "No track available to receive control, dropping";
+		return;
+	}
+
+	unsigned int payloadType = message->stream;
+	std::optional<string> mid;
+	if (auto it = mMidFromPayloadType.find(payloadType); it != mMidFromPayloadType.end()) {
+		mid = it->second;
+	} else {
+		std::lock_guard lock(mLocalDescriptionMutex);
+		if (!mLocalDescription)
+			return;
+
+		for (int i = 0; i < mLocalDescription->mediaCount(); ++i) {
+			if (auto found = std::visit(
+			        rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
+				                        return std::nullopt;
+			                        },
+			                        [&](Description::Media *media) -> std::optional<string> {
+				                        return media->hasPayloadType(payloadType)
+				                                   ? std::make_optional(media->mid())
+				                                   : nullopt;
+			                        }},
+			        mLocalDescription->media(i))) {
+
+				mMidFromPayloadType.emplace(payloadType, *found);
+				mid = *found;
+				break;
+			}
+		}
+	}
+
+	if (!mid) {
+		PLOG_WARNING << "Track not found for payload type " << payloadType << ", dropping";
+		return;
+	}
+
+	std::shared_lock lock(mTracksMutex); // read-only
+	if (auto it = mTracks.find(*mid); it != mTracks.end())
+		if (auto track = it->second.lock())
+			track->incoming(message);
 }
 
 void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
@@ -534,10 +575,9 @@ void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
 		channel->triggerBufferedAmount(amount);
 }
 
-shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role,
-                                                           const string &label,
-                                                           const string &protocol,
-                                                           const Reliability &reliability) {
+shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
+                                                           string protocol,
+                                                           Reliability reliability) {
 	// The active side must use streams with even identifiers, whereas the passive side must use
 	// streams with odd identifiers.
 	// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
@@ -548,8 +588,8 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role rol
 		if (stream >= 65535)
 			throw std::runtime_error("Too many DataChannels");
 	}
-	auto channel =
-	    std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
+	auto channel = std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
+	                                             std::move(protocol), std::move(reliability));
 	mDataChannels.emplace(std::make_pair(stream, channel));
 	return channel;
 }
@@ -598,6 +638,21 @@ void PeerConnection::openDataChannels() {
 		iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
 }
 
+void PeerConnection::openTracks() {
+#if RTC_ENABLE_MEDIA
+	if (!hasMedia())
+		return;
+
+	if (auto transport = std::atomic_load(&mDtlsTransport)) {
+		auto srtpTransport = std::reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
+		std::shared_lock lock(mTracksMutex); // read-only
+		for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
+			if (auto track = it->second.lock())
+				track->open(srtpTransport);
+	}
+#endif
+}
+
 void PeerConnection::closeDataChannels() {
 	iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
 }
@@ -607,25 +662,59 @@ void PeerConnection::remoteCloseDataChannels() {
 }
 
 void PeerConnection::processLocalDescription(Description description) {
-	std::optional<uint16_t> remoteSctpPort;
-	std::optional<string> remoteDataMid;
 	if (auto remote = remoteDescription()) {
-		remoteDataMid = remote->dataMid();
-	    remoteSctpPort = remote->sctpPort();
-	}
+		// Reciprocate remote description
+		for (int i = 0; i < remote->mediaCount(); ++i)
+			std::visit( // reciprocate each media
+			    rtc::overloaded{
+			        [&](Description::Application *app) {
+				        PLOG_DEBUG << "Reciprocating application in local description, mid=\""
+				                   << app->mid() << "\"";
+				        auto reciprocated = app->reciprocate();
+				        reciprocated.hintSctpPort(DEFAULT_SCTP_PORT);
+				        reciprocated.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
+				        description.addMedia(std::move(reciprocated));
+			        },
+			        [&](Description::Media *media) {
+				        PLOG_DEBUG << "Reciprocating media in local description, mid=\""
+				                   << media->mid() << "\"";
+
+				        description.addMedia(media->reciprocate());
+			        },
+			    },
+			    remote->media(i));
+	} else {
+		// Add application for data channels
+		{
+			std::shared_lock lock(mDataChannelsMutex);
+			if (!mDataChannels.empty()) {
+				const string mid = "data";
+				PLOG_DEBUG << "Adding application to local description, mid=\"" << mid << "\"";
+				Description::Application app;
+				app.setSctpPort(DEFAULT_SCTP_PORT);
+				app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
+				description.addMedia(std::move(app));
+			}
+		}
 
-	auto certificate = mCertificate.get(); // wait for certificate if not ready
+		// Add media for local tracks
+		{
+			std::shared_lock lock(mTracksMutex);
+			for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
+				if (auto track = it->second.lock()) {
+					PLOG_DEBUG << "Adding media to local description, mid=\"" << track->mid()
+					           << "\"";
+					description.addMedia(track->description());
+				}
+			}
+		}
+	}
 
-	{
-		std::lock_guard lock(mLocalDescriptionMutex);
-		mLocalDescription.emplace(std::move(description));
-		if (remoteDataMid)
-			mLocalDescription->setDataMid(*remoteDataMid);
+	// Set local fingerprint (wait for certificate if necessary)
+	description.setFingerprint(mCertificate.get()->fingerprint());
 
-		mLocalDescription->setFingerprint(certificate->fingerprint());
-		mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
-		mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
-	}
+	std::lock_guard lock(mLocalDescriptionMutex);
+	mLocalDescription.emplace(std::move(description));
 
 	mProcessor->enqueue([this, description = *mLocalDescription]() {
 		mLocalDescriptionCallback(std::move(description));
@@ -653,6 +742,14 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	    [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
 }
 
+void PeerConnection::triggerTrack(std::weak_ptr<Track> weakTrack) {
+	auto track = weakTrack.lock();
+	if (!track)
+		return;
+
+	mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
+}
+
 bool PeerConnection::changeState(State state) {
 	State current;
 	do {

+ 133 - 0
src/track.cpp

@@ -0,0 +1,133 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "track.hpp"
+#include "dtlssrtptransport.hpp"
+#include "include.hpp"
+
+namespace rtc {
+
+using std::shared_ptr;
+using std::weak_ptr;
+
+Track::Track(Description::Media description)
+    : mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
+
+string Track::mid() const { return mMediaDescription.mid(); }
+
+Description::Media Track::description() const { return mMediaDescription; }
+
+void Track::close() {
+	mIsClosed = true;
+	resetCallbacks();
+	setRtcpHandler(nullptr);
+}
+
+bool Track::send(message_variant data) { return outgoing(make_message(std::move(data))); }
+
+bool Track::send(const byte *data, size_t size) {
+	return outgoing(std::make_shared<Message>(data, data + size, Message::Binary));
+}
+
+std::optional<message_variant> Track::receive() {
+	if (!mRecvQueue.empty())
+		return to_variant(std::move(**mRecvQueue.pop()));
+
+	return nullopt;
+}
+
+bool Track::isOpen(void) const {
+#if RTC_ENABLE_MEDIA
+	return !mIsClosed && mDtlsSrtpTransport.lock();
+#else
+	return !mIsClosed;
+#endif
+}
+
+bool Track::isClosed(void) const { return mIsClosed; }
+
+size_t Track::maxMessageSize() const {
+	return 65535 - 12 - 4; // SRTP/UDP
+}
+
+size_t Track::availableAmount() const {
+	return mRecvQueue.amount();
+}
+
+#if RTC_ENABLE_MEDIA
+void Track::open(shared_ptr<DtlsSrtpTransport> transport) { mDtlsSrtpTransport = transport; }
+#endif
+
+bool Track::outgoing(message_ptr message) {
+	if (mIsClosed)
+		throw std::runtime_error("Track is closed");
+
+	if (message->size() > maxMessageSize())
+		throw std::runtime_error("Message size exceeds limit");
+
+#if RTC_ENABLE_MEDIA
+	auto transport = mDtlsSrtpTransport.lock();
+	if (!transport)
+		throw std::runtime_error("Track transport is not open");
+
+	return transport->sendMedia(message);
+#else
+	PLOG_WARNING << "Ignoring track send (not compiled with SRTP support)";
+	return false;
+#endif
+}
+
+void Track::incoming(message_ptr message) {
+	if (!message)
+		return;
+
+	if (mRtcpHandler) {
+		auto opt = mRtcpHandler->incoming(message);
+		if (!opt)
+			return;
+
+		message = *opt;
+	}
+
+	// Tail drop if queue is full
+	if (mRecvQueue.full())
+		return;
+
+	mRecvQueue.push(message);
+	triggerAvailable(mRecvQueue.size());
+}
+
+void Track::setRtcpHandler(std::shared_ptr<RtcpHandler> handler) {
+	if (mRtcpHandler)
+		mRtcpHandler->onOutgoing(nullptr);
+
+	mRtcpHandler = std::move(handler);
+	if (mRtcpHandler) {
+		mRtcpHandler->onOutgoing([&]([[maybe_unused]] message_ptr message) {
+#if RTC_ENABLE_MEDIA
+			if (auto transport = mDtlsSrtpTransport.lock())
+				transport->sendMedia(message);
+#else
+			PLOG_WARNING << "Ignoring RTCP send (not compiled with SRTP support)";
+#endif
+		});
+	}
+}
+
+} // namespace rtc
+

+ 8 - 6
src/websocket.cpp

@@ -38,7 +38,8 @@ namespace rtc {
 using std::shared_ptr;
 
 WebSocket::WebSocket(std::optional<Configuration> config)
-    : mConfig(config ? std::move(*config) : Configuration()) {
+    : mConfig(config ? std::move(*config) : Configuration()),
+      mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
 	PLOG_VERBOSE << "Creating WebSocket";
 }
 
@@ -109,10 +110,11 @@ bool WebSocket::isClosed() const { return mState == State::Closed; }
 size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
 
 std::optional<message_variant> WebSocket::receive() {
-	while (!mRecvQueue.empty())
-		if (auto variant = to_variant(std::move(**mRecvQueue.pop())))
-			return variant;
-
+	while (!mRecvQueue.empty()) {
+		auto message = *mRecvQueue.pop();
+		if (message->type != Message::Control)
+			return to_variant(std::move(*message));
+	}
 	return nullopt;
 }
 
@@ -319,6 +321,6 @@ void WebSocket::closeTransports() {
 	});
 }
 
-} // namespace rtc
+	} // namespace rtc
 
 #endif

+ 4 - 4
test/connectivity.cpp

@@ -52,7 +52,7 @@ void test_connectivity() {
 		if (!pc2)
 			return;
 		cout << "Description 1: " << sdp << endl;
-		pc2->setRemoteDescription(std::move(sdp));
+		pc2->setRemoteDescription(string(sdp));
 	});
 
 	pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](Candidate candidate) {
@@ -60,7 +60,7 @@ void test_connectivity() {
 		if (!pc2)
 			return;
 		cout << "Candidate 1: " << candidate << endl;
-		pc2->addRemoteCandidate(std::move(candidate));
+		pc2->addRemoteCandidate(string(candidate));
 	});
 
 	pc1->onStateChange([](PeerConnection::State state) { cout << "State 1: " << state << endl; });
@@ -74,7 +74,7 @@ void test_connectivity() {
 		if (!pc1)
 			return;
 		cout << "Description 2: " << sdp << endl;
-		pc1->setRemoteDescription(std::move(sdp));
+		pc1->setRemoteDescription(string(sdp));
 	});
 
 	pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](Candidate candidate) {
@@ -82,7 +82,7 @@ void test_connectivity() {
 		if (!pc1)
 			return;
 		cout << "Candidate 2: " << candidate << endl;
-		pc1->addRemoteCandidate(std::move(candidate));
+		pc1->addRemoteCandidate(string(candidate));
 	});
 
 	pc2->onStateChange([](PeerConnection::State state) { cout << "State 2: " << state << endl; });