Browse Source

Add RTCP Nack responder

Filip Klembara 4 years ago
parent
commit
c2c57b16df
6 changed files with 257 additions and 6 deletions
  1. 2 0
      CMakeLists.txt
  2. 6 0
      include/rtc/rtc.h
  3. 94 0
      include/rtc/rtcpnackresponder.hpp
  4. 29 6
      include/rtc/rtp.hpp
  5. 8 0
      src/capi.cpp
  6. 118 0
      src/rtcpnackresponder.cpp

+ 2 - 0
CMakeLists.txt

@@ -77,6 +77,7 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpchainablehandler.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerelement.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerrootelement.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpnackresponder.cpp
 )
 
 set(LIBDATACHANNEL_WEBSOCKET_SOURCES
@@ -120,6 +121,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpchainablehandler.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerelement.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerrootelement.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpnackresponder.hpp
 )
 
 set(TESTS_SOURCES

+ 6 - 0
include/rtc/rtc.h

@@ -41,6 +41,7 @@ extern "C" {
 
 #if RTC_ENABLE_MEDIA
 #define RTC_DEFAULT_MAXIMUM_FRAGMENT_SIZE ((uint16_t)1400)
+#define RTC_DEFAULT_MAXIMUM_PACKET_COUNT_FOR_NACK_CACHE ((unsigned)512)
 #endif
 
 #include <stdbool.h>
@@ -243,6 +244,11 @@ RTC_EXPORT int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char
 /// @param tr Track id
 int rtcChainRtcpSRReporter(int tr);
 
+/// Chain RtcpNackResponder to handler chain for given track
+/// @param tr Track id
+/// @param maxStoredPacketsCount Maximum stored packet count
+int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount);
+
 /// Set start time for RTP stream
 /// @param startTime_s Start time in seconds
 /// @param timeIntervalSince1970 Set true if `startTime_s` is time interval since 1970, false if `startTime_s` is time interval since 1900

+ 94 - 0
include/rtc/rtcpnackresponder.hpp

@@ -0,0 +1,94 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_RTCP_NACK_RESPONDER_H
+#define RTC_RTCP_NACK_RESPONDER_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "messagehandlerelement.hpp"
+
+#include <unordered_map>
+#include <queue>
+
+namespace rtc {
+
+class RTC_CPP_EXPORT RtcpNackResponder: public MessageHandlerElement {
+
+	/// Packet storage
+	class RTC_CPP_EXPORT Storage {
+		
+		/// Packet storage element
+		struct RTC_CPP_EXPORT Element {
+			Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next = nullptr);
+			const binary_ptr packet;
+			const uint16_t sequenceNumber;
+			/// Pointer to newer element
+			std::shared_ptr<Element> next = nullptr;
+		};
+
+	private:
+		/// Oldest packet in storage
+		std::shared_ptr<Element> oldest = nullptr;
+		/// Newest packet in storage
+		std::shared_ptr<Element> newest = nullptr;
+
+		/// Inner storage
+		std::unordered_map<uint16_t, std::shared_ptr<Element>> storage{};
+
+		/// Maximum storage size
+		const unsigned maximumSize;
+
+		/// Returnst current size
+		unsigned size();
+
+	public:
+		static const unsigned defaultMaximumSize = 512;
+
+		Storage(unsigned _maximumSize);
+
+		/// Returns packet with given sequence number
+		std::optional<binary_ptr> get(uint16_t sequenceNumber);
+
+		/// Stores packet
+		/// @param packet Packet
+		void store(binary_ptr packet);
+	};
+
+	const std::shared_ptr<Storage> storage;
+	std::mutex reportMutex;
+
+public:
+	RtcpNackResponder(unsigned maxStoredPacketCount = Storage::defaultMaximumSize);
+
+	/// Checks for RTCP NACK and handles it,
+	/// @param message RTCP message
+	/// @returns unchanged RTCP message and requested RTP packets
+	ChainedIncomingControlProduct processIncomingControlMessage(message_ptr message) override;
+
+	/// Stores RTP packets in internal storage
+	/// @param messages RTP packets
+	/// @param control RTCP
+	/// @returns Unchanged RTP and RTCP
+	ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, std::optional<message_ptr> control) override;
+};
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */
+
+#endif /* RTC_RTCP_NACK_RESPONDER_H */

+ 29 - 6
include/rtc/rtp.hpp

@@ -617,8 +617,31 @@ struct RTCP_FIR {
 };
 
 struct RTCP_NACK_PART {
-	uint16_t pid;
-	uint16_t blp;
+	uint16_t _pid;
+	uint16_t _blp;
+
+	uint16_t getPID() { return ntohs(_pid); }
+	uint16_t getBLP() { return ntohs(_blp); }
+
+	void setPID(uint16_t pid) { _pid = htons(pid); }
+	void setBLP(uint16_t blp) { _blp = htons(blp); }
+
+	std::vector<uint16_t> getSequenceNumbers() {
+		std::vector<uint16_t> result{};
+		result.reserve(17);
+		auto pid = getPID();
+		result.push_back(pid);
+		auto bitmask = getBLP();
+		auto i = pid + 1;
+		while (bitmask > 0) {
+			if (bitmask & 0x1) {
+				result.push_back(i);
+			}
+			i += 1;
+			bitmask >>= 1;
+		}
+		return result;
+	}
 };
 
 class RTCP_NACK {
@@ -644,16 +667,16 @@ public:
 	 */
 	bool addMissingPacket(unsigned int *fciCount, uint16_t *fciPID, uint16_t missingPacket) {
 		if (*fciCount == 0 || missingPacket < *fciPID || missingPacket > (*fciPID + 16)) {
-			parts[*fciCount].pid = htons(missingPacket);
-			parts[*fciCount].blp = 0;
+			parts[*fciCount].setPID(missingPacket);
+			parts[*fciCount].setBLP(0);
 			*fciPID = missingPacket;
 			(*fciCount)++;
 			return true;
 		} else {
 			// TODO SPEEED!
-			auto blp = ntohs(parts[(*fciCount) - 1].blp);
+			auto blp = parts[(*fciCount) - 1].getBLP();
 			auto newBit = 1u << (unsigned int)(missingPacket - (1 + *fciPID));
-			parts[(*fciCount) - 1].blp = htons(blp | newBit);
+			parts[(*fciCount) - 1].setBLP(blp | newBit);
 			return false;
 		}
 	}

+ 8 - 0
src/capi.cpp

@@ -593,6 +593,14 @@ int rtcChainRtcpSRReporter(int tr) {
 	});
 }
 
+int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount) {
+	return WRAP({
+		auto responder = std::make_shared<RtcpNackResponder>(maxStoredPacketsCount);
+		auto chainableHandler = getRTCPChainableHandler(tr);
+		chainableHandler->addToChain(responder);
+	});
+}
+
 int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeIntervalSince1970,
                                     uint32_t timestamp) {
 	return WRAP({

+ 118 - 0
src/rtcpnackresponder.cpp

@@ -0,0 +1,118 @@
+/**
+ * libdatachannel streamer example
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "rtcpnackresponder.hpp"
+
+namespace rtc {
+
+RtcpNackResponder::Storage::Element::Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next)
+: packet(packet), sequenceNumber(sequenceNumber), next(next) { }
+
+unsigned RtcpNackResponder::Storage::size() { return storage.size(); }
+
+RtcpNackResponder::Storage::Storage(unsigned _maximumSize): maximumSize(_maximumSize) {
+	assert(maximumSize > 0);
+	storage.reserve(maximumSize);
+}
+
+std::optional<binary_ptr> RtcpNackResponder::Storage::get(uint16_t sequenceNumber) {
+	auto position = storage.find(sequenceNumber);
+	return position != storage.end() ? std::make_optional(storage.at(sequenceNumber)->packet) : nullopt;
+}
+
+void RtcpNackResponder::Storage::store(binary_ptr packet) {
+	if (!packet || packet->size() < 12) {
+		return;
+	}
+	auto rtp = reinterpret_cast<RTP *>(packet->data());
+	auto sequenceNumber = rtp->seqNumber();
+
+	assert((storage.empty() && !oldest && !newest) || (!storage.empty() && oldest && newest));
+
+	if (size() == 0) {
+		newest = std::make_shared<Element>(packet, sequenceNumber);
+		oldest = newest;
+	} else {
+		auto current = std::make_shared<Element>(packet, sequenceNumber);
+		newest->next = current;
+		newest = current;
+	}
+
+	storage.emplace(sequenceNumber, newest);
+
+	if (size() > maximumSize) {
+		assert(oldest);
+		if (oldest) {
+			storage.erase(oldest->sequenceNumber);
+			oldest = oldest->next;
+		}
+	}
+}
+
+RtcpNackResponder::RtcpNackResponder(unsigned maxStoredPacketCount)
+: MessageHandlerElement(), storage(std::make_shared<Storage>(maxStoredPacketCount)) { }
+
+ChainedIncomingControlProduct RtcpNackResponder::processIncomingControlMessage(message_ptr message) {
+	std::optional<ChainedOutgoingResponseProduct> optPackets = ChainedOutgoingResponseProduct();
+	auto packets = make_chained_messages_product();
+
+	unsigned int i = 0;
+	while (i < message->size()) {
+		auto nack = reinterpret_cast<RTCP_NACK *>(message->data() + i);
+		i += nack->header.header.lengthInBytes();
+		// check if rtcp is nack
+		if (nack->header.header.payloadType() != 205 || nack->header.header.reportCount() != 1) {
+			continue;
+		}
+		
+		auto fieldsCount = nack->getSeqNoCount();
+
+		std::vector<uint16_t> missingSequenceNumbers{};
+		for(unsigned int i = 0; i < fieldsCount; i++) {
+			auto field = nack->parts[i];
+			auto newMissingSeqenceNumbers = field.getSequenceNumbers();
+			missingSequenceNumbers.insert(missingSequenceNumbers.end(), newMissingSeqenceNumbers.begin(), newMissingSeqenceNumbers.end());
+		}
+		packets->reserve(packets->size() + missingSequenceNumbers.size());
+		for (auto sequenceNumber: missingSequenceNumbers) {
+			auto optPacket = storage->get(sequenceNumber);
+			if (optPacket.has_value()) {
+				auto packet = optPacket.value();
+				packets->push_back(packet);
+			}
+		}
+	}
+
+	if (!packets->empty()) {
+		return {message, ChainedOutgoingResponseProduct(packets)};
+	} else {
+		return {message, nullopt};
+	}
+}
+
+ChainedOutgoingProduct RtcpNackResponder::processOutgoingBinaryMessage(ChainedMessagesProduct messages, std::optional<message_ptr> control) {
+	for (auto message: *messages) {
+		storage->store(message);
+	}
+	return {messages, control};
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */