Przeglądaj źródła

Merge pull request #1396 from paullouisageneau/rewrite-h264-depacketizer

Rewrite H264 and H265 RTP depacketization
Paul-Louis Ageneau 1 miesiąc temu
rodzic
commit
f666cc442c

+ 7 - 15
include/rtc/h264rtpdepacketizer.hpp

@@ -13,34 +13,26 @@
 #if RTC_ENABLE_MEDIA
 
 #include "common.hpp"
-#include "mediahandler.hpp"
 #include "message.hpp"
 #include "nalunit.hpp"
 #include "rtp.hpp"
-
-#include <iterator>
+#include "rtpdepacketizer.hpp"
 
 namespace rtc {
 
 /// RTP depacketization for H264
-class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler {
+class RTC_CPP_EXPORT H264RtpDepacketizer final : public VideoRtpDepacketizer {
 public:
 	using Separator = NalUnit::Separator;
 
-	inline static const uint32_t ClockRate = 90 * 1000;
-
-	H264RtpDepacketizer(Separator separator = Separator::LongStartSequence);
-	virtual ~H264RtpDepacketizer() = default;
-
-	void incoming(message_vector &messages, const message_callback &send) override;
+	H264RtpDepacketizer(Separator separator = Separator::StartSequence);
+	~H264RtpDepacketizer();
 
 private:
-	std::vector<message_ptr> mRtpBuffer;
-	const NalUnit::Separator mSeparator;
+	message_ptr reassemble(message_buffer &buffer) override;
+	void addSeparator(binary &frame);
 
-	void addSeparator(binary &accessUnit);
-	message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
-	                           uint8_t payloadType, uint32_t timestamp);
+	const NalUnit::Separator mSeparator;
 };
 
 } // namespace rtc

+ 8 - 14
include/rtc/h265rtpdepacketizer.hpp

@@ -15,33 +15,27 @@
 
 #include "common.hpp"
 #include "h265nalunit.hpp"
-#include "mediahandler.hpp"
 #include "message.hpp"
 #include "rtp.hpp"
+#include "rtpdepacketizer.hpp"
 
-#include <iterator>
+#include <set>
 
 namespace rtc {
 
 /// RTP depacketization for H265
-class RTC_CPP_EXPORT H265RtpDepacketizer : public MediaHandler {
+class RTC_CPP_EXPORT H265RtpDepacketizer final : public VideoRtpDepacketizer {
 public:
 	using Separator = NalUnit::Separator;
 
-	inline static const uint32_t ClockRate = 90 * 1000;
-
-	H265RtpDepacketizer(Separator separator = Separator::LongStartSequence);
-	virtual ~H265RtpDepacketizer() = default;
-
-	void incoming(message_vector &messages, const message_callback &send) override;
+	H265RtpDepacketizer(Separator separator = Separator::StartSequence);
+	~H265RtpDepacketizer();
 
 private:
-	std::vector<message_ptr> mRtpBuffer;
-	const NalUnit::Separator separator;
+	message_ptr reassemble(message_buffer &buffer);
+	void addSeparator(binary &frame);
 
-	void addSeparator(binary& accessUnit);
-	message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
-	                           uint8_t payloadType, uint32_t timestamp);
+	const NalUnit::Separator mSeparator;
 };
 
 } // namespace rtc

+ 44 - 3
include/rtc/rtpdepacketizer.hpp

@@ -14,8 +14,11 @@
 #include "mediahandler.hpp"
 #include "message.hpp"
 
+#include <set>
+
 namespace rtc {
 
+// Base RTP depacketizer class
 class RTC_CPP_EXPORT RtpDepacketizer : public MediaHandler {
 public:
 	RtpDepacketizer();
@@ -24,12 +27,50 @@ public:
 
 	virtual void incoming(message_vector &messages, const message_callback &send) override;
 
+protected:
+	shared_ptr<FrameInfo> createFrameInfo(uint32_t timestamp, uint8_t payloadType) const;
+
 private:
-	uint32_t mClockRate;
+	const uint32_t mClockRate;
+};
+
+// Base class for video RTP depacketizer
+class RTC_CPP_EXPORT VideoRtpDepacketizer : public RtpDepacketizer {
+public:
+	inline static const uint32_t ClockRate = 90000;
+
+	VideoRtpDepacketizer();
+	virtual ~VideoRtpDepacketizer();
+
+protected:
+	struct sequence_cmp {
+		bool operator()(message_ptr a, message_ptr b) const;
+	};
+	using message_buffer = std::set<message_ptr, sequence_cmp>;
+
+	virtual message_ptr reassemble(message_buffer &messages) = 0;
+
+private:
+	void incoming(message_vector &messages, const message_callback &send) override;
+
+	message_buffer mBuffer;
+};
+
+// Generic audio RTP depacketizer
+template <uint32_t DEFAULT_CLOCK_RATE>
+class RTC_CPP_EXPORT AudioRtpDepacketizer final : public RtpDepacketizer {
+public:
+	inline static const uint32_t DefaultClockRate = DEFAULT_CLOCK_RATE;
+
+	AudioRtpDepacketizer(uint32_t clockRate = DefaultClockRate) : RtpDepacketizer(clockRate) {}
 };
 
-using OpusRtpDepacketizer = RtpDepacketizer;
-using AACRtpDepacketizer = RtpDepacketizer;
+// Audio RTP depacketizers
+using OpusRtpDepacketizer = AudioRtpDepacketizer<48000>;
+using AACRtpDepacketizer = AudioRtpDepacketizer<48000>;
+using PCMARtpDepacketizer = AudioRtpDepacketizer<8000>;
+using PCMURtpDepacketizer = AudioRtpDepacketizer<8000>;
+using G722RtpDepacketizer = AudioRtpDepacketizer<8000>;
 
 } // namespace rtc
 

+ 91 - 108
src/h264rtpdepacketizer.cpp

@@ -14,6 +14,7 @@
 #include "impl/internals.hpp"
 
 #include <algorithm>
+#include <cassert>
 #include <chrono>
 
 namespace rtc {
@@ -27,144 +28,126 @@ const uint8_t naluTypeFUA = 28;
 H264RtpDepacketizer::H264RtpDepacketizer(Separator separator) : mSeparator(separator) {
 	if (separator != Separator::StartSequence && separator != Separator::LongStartSequence &&
 	    separator != Separator::ShortStartSequence) {
-		throw std::invalid_argument("Invalid separator");
+		throw std::invalid_argument("Unimplemented H264 separator");
 	}
 }
 
-void H264RtpDepacketizer::addSeparator(binary &accessUnit) {
-	if (mSeparator == Separator::StartSequence || mSeparator == Separator::LongStartSequence) {
-		accessUnit.insert(accessUnit.end(), naluLongStartCode.begin(), naluLongStartCode.end());
-	} else if (mSeparator == Separator::ShortStartSequence) {
-		accessUnit.insert(accessUnit.end(), naluShortStartCode.begin(), naluShortStartCode.end());
-	} else {
-		throw std::invalid_argument("Invalid separator");
-	}
-}
+H264RtpDepacketizer::~H264RtpDepacketizer() {}
 
-message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
-                                                message_vector::iterator end, uint8_t payloadType,
-                                                uint32_t timestamp) {
-	message_vector out;
-	binary accessUnit;
+message_ptr H264RtpDepacketizer::reassemble(message_buffer &buffer) {
+	if (buffer.empty())
+		return nullptr;
 
-	for (auto it = begin; it != end; ++it) {
-		auto pkt = it->get();
-		auto pktParsed = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-		auto rtpHeaderSize = pktParsed->getSize() + pktParsed->getExtensionHeaderSize();
-		auto rtpPaddingSize = 0;
+	auto first = *buffer.begin();
+	auto firstRtpHeader = reinterpret_cast<const RtpHeader *>(first->data());
+	uint8_t payloadType = firstRtpHeader->payloadType();
+	uint32_t timestamp = firstRtpHeader->timestamp();
+	uint16_t nextSeqNumber = firstRtpHeader->seqNumber();
 
-		if (pktParsed->padding()) {
-			rtpPaddingSize = std::to_integer<uint8_t>(pkt->at(pkt->size() - 1));
-		}
-
-		if (pkt->size() == rtpHeaderSize + rtpPaddingSize) {
-			PLOG_VERBOSE << "H.264 RTP packet has empty payload";
+	binary frame;
+	bool continuousFragments = false;
+	for (const auto &packet : buffer) {
+		auto rtpHeader = reinterpret_cast<const rtc::RtpHeader *>(packet->data());
+		if (rtpHeader->seqNumber() < nextSeqNumber) {
+			// Skip
 			continue;
 		}
+		if (rtpHeader->seqNumber() > nextSeqNumber) {
+			// Missing packet(s)
+			continuousFragments = false;
+		}
 
-		auto nalUnitHeader = NalUnitHeader{std::to_integer<uint8_t>(pkt->at(rtpHeaderSize))};
+		nextSeqNumber = rtpHeader->seqNumber() + 1;
+
+		auto rtpHeaderSize = rtpHeader->getSize() + rtpHeader->getExtensionHeaderSize();
+		auto paddingSize = 0;
+		if (rtpHeader->padding())
+			paddingSize = std::to_integer<uint8_t>(packet->back());
+
+		if (packet->size() <= rtpHeaderSize + paddingSize)
+			continue; // Empty payload
+
+		auto nalUnitHeader = NalUnitHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize))};
 
 		if (nalUnitHeader.unitType() == naluTypeFUA) {
-			auto nalUnitFragmentHeader = NalUnitFragmentHeader{
-			    std::to_integer<uint8_t>(pkt->at(rtpHeaderSize + sizeof(NalUnitHeader)))};
+			if (packet->size() <= rtpHeaderSize + paddingSize + 1)
+				continue; // Empty FU-A
+
+			auto nalUnitFragmentHeader =
+			    NalUnitFragmentHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize + 1))};
 
 			// RFC 6184: When set to one, the Start bit indicates the start of a fragmented NAL
 			// unit. When the following FU payload is not the start of a fragmented NAL unit
 			// payload, the Start bit is set to zero.
-			if (nalUnitFragmentHeader.isStart() || accessUnit.empty()) {
-				addSeparator(accessUnit);
-				accessUnit.emplace_back(
-				    byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType()));
+			if (nalUnitFragmentHeader.isStart()) {
+				addSeparator(frame);
+				frame.emplace_back(byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType()));
+				continuousFragments = true;
 			}
 
-			accessUnit.insert(accessUnit.end(),
-			                  pkt->begin() + rtpHeaderSize + sizeof(NalUnitHeader) +
-			                      sizeof(NalUnitFragmentHeader),
-			                  pkt->end());
-		} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
-			addSeparator(accessUnit);
-			accessUnit.insert(accessUnit.end(), pkt->begin() + rtpHeaderSize, pkt->end());
-		} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
-			auto currOffset = rtpHeaderSize + sizeof(NalUnitHeader);
+			// RFC 6184: If a fragmentation unit is lost, the receiver SHOULD discard all following
+			// fragmentation units in transmission order corresponding to the same fragmented NAL
+			// unit.
+			if (continuousFragments) {
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize + 2,
+				             packet->end() - paddingSize);
+			}
 
-			while (currOffset + sizeof(uint16_t) < pkt->size()) {
-				auto naluSize = std::to_integer<uint16_t>(pkt->at(currOffset)) << 8 |
-				                std::to_integer<uint16_t>(pkt->at(currOffset + 1));
+			// RFC 6184: When set to one, the End bit indicates the end of a fragmented NAL unit,
+			// i.e., the last byte of the payload is also the last byte of the fragmented NAL unit.
+			// When the following FU payload is not the last fragment of a fragmented NAL unit, the
+			// End bit is set to zero.
+			if (nalUnitFragmentHeader.isEnd())
+				continuousFragments = false;
 
-				currOffset += sizeof(uint16_t);
+		} else {
+			continuousFragments = false;
 
-				if (pkt->size() < currOffset + naluSize) {
-					throw std::runtime_error("H264 STAP-A declared size is larger than buffer");
-				}
+			if (nalUnitHeader.unitType() == naluTypeSTAPA) {
+				auto offset = rtpHeaderSize + 1;
 
-				addSeparator(accessUnit);
-				accessUnit.insert(accessUnit.end(), pkt->begin() + currOffset,
-				                  pkt->begin() + currOffset + naluSize);
+				while (offset + 2 < packet->size() - paddingSize) {
+					auto naluSize = std::to_integer<uint16_t>(packet->at(offset)) << 8 |
+					                std::to_integer<uint16_t>(packet->at(offset + 1));
 
-				currOffset += naluSize;
-			}
-		} else {
-			throw std::runtime_error("Unknown H264 RTP Packetization");
-		}
-	}
+					offset += 2;
 
-	if (!accessUnit.empty()) {
-		auto frameInfo = std::make_shared<FrameInfo>(timestamp);
-		frameInfo->timestampSeconds = std::chrono::duration<double>(double(timestamp) / double(ClockRate));
-		frameInfo->payloadType = payloadType;
-		out.emplace_back(make_message(std::move(accessUnit), frameInfo));
-	}
+					if (offset + naluSize > packet->size() - paddingSize)
+						throw std::runtime_error("H264 STAP-A size is larger than payload");
 
-	return out;
-}
+					addSeparator(frame);
+					frame.insert(frame.end(), packet->begin() + offset,
+					             packet->begin() + offset + naluSize);
 
-void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
-	messages.erase(std::remove_if(messages.begin(), messages.end(),
-	                              [&](message_ptr message) {
-		                              if (message->type == Message::Control) {
-			                              return false;
-		                              }
-
-		                              if (message->size() < sizeof(RtpHeader)) {
-			                              PLOG_VERBOSE << "RTP packet is too small, size="
-			                                           << message->size();
-			                              return true;
-		                              }
-
-		                              mRtpBuffer.push_back(std::move(message));
-		                              return true;
-	                              }),
-	               messages.end());
-
-	while (mRtpBuffer.size() != 0) {
-		uint8_t payload_type = 0;
-		uint32_t current_timestamp = 0;
-		size_t packets_in_timestamp = 0;
-
-		for (const auto &pkt : mRtpBuffer) {
-			auto p = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-
-			if (current_timestamp == 0) {
-				current_timestamp = p->timestamp();
-				payload_type =
-				    p->payloadType(); // should all be the same for data of the same codec
-			} else if (current_timestamp != p->timestamp()) {
-				break;
-			}
+					offset += naluSize;
+				}
 
-			packets_in_timestamp++;
-		}
+			} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
+				addSeparator(frame);
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize,
+				             packet->end() - paddingSize);
 
-		if (packets_in_timestamp == mRtpBuffer.size()) {
-			break;
+			} else {
+				throw std::runtime_error("Unknown H264 RTP Packetization");
+			}
 		}
+	}
 
-		auto begin = mRtpBuffer.begin();
-		auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);
+	return make_message(std::move(frame), createFrameInfo(timestamp, payloadType));
+}
 
-		auto frames = buildFrames(begin, end + 1, payload_type, current_timestamp);
-		messages.insert(messages.end(), frames.begin(), frames.end());
-		mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
+void H264RtpDepacketizer::addSeparator(binary &frame) {
+	switch (mSeparator) {
+	case Separator::StartSequence:
+		[[fallthrough]];
+	case Separator::LongStartSequence:
+		frame.insert(frame.end(), naluLongStartCode.begin(), naluLongStartCode.end());
+		break;
+	case Separator::ShortStartSequence:
+		frame.insert(frame.end(), naluShortStartCode.begin(), naluShortStartCode.end());
+		break;
+	default:
+		throw std::invalid_argument("Invalid separator");
 	}
 }
 

+ 103 - 131
src/h265rtpdepacketizer.cpp

@@ -24,168 +24,140 @@ const binary naluShortStartCode = {byte{0}, byte{0}, byte{1}};
 const uint8_t naluTypeAP = 48;
 const uint8_t naluTypeFU = 49;
 
-H265RtpDepacketizer::H265RtpDepacketizer(Separator separator) : separator(separator) {
-	switch (separator) {
-	case Separator::StartSequence: [[fallthrough]];
-	case Separator::LongStartSequence: [[fallthrough]];
-	case Separator::ShortStartSequence:
-		break;
-	case Separator::Length: [[fallthrough]];
-	default:
-		throw std::invalid_argument("Invalid separator");
+H265RtpDepacketizer::H265RtpDepacketizer(Separator separator) : mSeparator(separator) {
+	if (separator != Separator::StartSequence && separator != Separator::LongStartSequence &&
+	    separator != Separator::ShortStartSequence) {
+		throw std::invalid_argument("Unimplemented H265 separator");
 	}
 }
 
-void H265RtpDepacketizer::addSeparator(binary& accessUnit)
-{
-	switch (separator) {
-	case Separator::StartSequence: [[fallthrough]];
-	case Separator::LongStartSequence:
-		accessUnit.insert(accessUnit.end(),
-		                  naluLongStartCode.begin(),
-		                  naluLongStartCode.end());
-		break;
-	case Separator::ShortStartSequence:
-		accessUnit.insert(accessUnit.end(),
-		                  naluShortStartCode.begin(),
-		                  naluShortStartCode.end());
-		break;
-	case Separator::Length: [[fallthrough]];
-	default:
-		throw std::invalid_argument("Invalid separator");
-	}
-}
-
-message_vector H265RtpDepacketizer::buildFrames(message_vector::iterator begin,
-                                                message_vector::iterator end,
-                                                uint8_t payloadType, uint32_t timestamp) {
-	message_vector out;
-	binary accessUnit;
+H265RtpDepacketizer::~H265RtpDepacketizer() {}
 
-	for (auto it = begin; it != end; ++it) {
-		auto pkt = it->get();
-		auto pktParsed = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-		auto rtpHeaderSize = pktParsed->getSize() + pktParsed->getExtensionHeaderSize();
-		auto rtpPaddingSize = 0;
+message_ptr H265RtpDepacketizer::reassemble(message_buffer &buffer) {
+	if (buffer.empty())
+		return nullptr;
 
-		if (pktParsed->padding()) {
-			rtpPaddingSize = std::to_integer<uint8_t>(pkt->at(pkt->size() - 1));
-		}
+	auto first = *buffer.begin();
+	auto firstRtpHeader = reinterpret_cast<const RtpHeader *>(first->data());
+	uint8_t payloadType = firstRtpHeader->payloadType();
+	uint32_t timestamp = firstRtpHeader->timestamp();
+	uint16_t nextSeqNumber = firstRtpHeader->seqNumber();
 
-		if (pkt->size() == rtpHeaderSize + rtpPaddingSize) {
-			PLOG_VERBOSE << "H.265 RTP packet has empty payload";
+	binary frame;
+	bool continuousFragments = false;
+	for (const auto &packet : buffer) {
+		auto rtpHeader = reinterpret_cast<const rtc::RtpHeader *>(packet->data());
+		if (rtpHeader->seqNumber() < nextSeqNumber) {
+			// Skip
 			continue;
 		}
+		if (rtpHeader->seqNumber() > nextSeqNumber) {
+			// Missing packet(s)
+			continuousFragments = false;
+		}
+
+		nextSeqNumber = rtpHeader->seqNumber() + 1;
+
+		auto rtpHeaderSize = rtpHeader->getSize() + rtpHeader->getExtensionHeaderSize();
+		auto paddingSize = 0;
+		if (rtpHeader->padding())
+			paddingSize = std::to_integer<uint8_t>(packet->back());
+
+		if (packet->size() <= rtpHeaderSize + paddingSize)
+			continue; // Empty payload
+
+		size_t payloadSize = packet->size() - (rtpHeaderSize + paddingSize);
+		if (payloadSize < 2)
+			throw std::runtime_error("Truncated H265 NAL unit");
 
 		auto nalUnitHeader =
-		    H265NalUnitHeader{std::to_integer<uint8_t>(pkt->at(rtpHeaderSize)),
-		                      std::to_integer<uint8_t>(pkt->at(rtpHeaderSize + 1))};
+		    H265NalUnitHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize)),
+		                      std::to_integer<uint8_t>(packet->at(rtpHeaderSize + 1))};
 
 		if (nalUnitHeader.unitType() == naluTypeFU) {
-			auto nalUnitFragmentHeader = H265NalUnitFragmentHeader{
-			    std::to_integer<uint8_t>(pkt->at(rtpHeaderSize + sizeof(H265NalUnitHeader)))};
+			if (payloadSize <= 2)
+				continue; // Empty FU
 
-			// RFC 7798: "When set to 1, the S bit indicates the start of a fragmented
+			auto nalUnitFragmentHeader =
+			    H265NalUnitFragmentHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize + 2))};
+
+			// RFC 7798: When set to 1, the S bit indicates the start of a fragmented
 			// NAL unit, i.e., the first byte of the FU payload is also the first byte of
 			// the payload of the fragmented NAL unit. When the FU payload is not the start
-			// of the fragmented NAL unit payload, the S bit MUST be set to 0."
-			if (nalUnitFragmentHeader.isStart() || accessUnit.empty()) {
-				addSeparator(accessUnit);
+			// of the fragmented NAL unit payload, the S bit MUST be set to 0.
+			if (nalUnitFragmentHeader.isStart()) {
+				addSeparator(frame);
 				nalUnitHeader.setUnitType(nalUnitFragmentHeader.unitType());
-				accessUnit.emplace_back(byte(nalUnitHeader._first));
-				accessUnit.emplace_back(byte(nalUnitHeader._second));
+				frame.emplace_back(byte(nalUnitHeader._first));
+				frame.emplace_back(byte(nalUnitHeader._second));
+				continuousFragments = true;
 			}
 
-			accessUnit.insert(accessUnit.end(),
-			                  pkt->begin() + rtpHeaderSize + sizeof(H265NalUnitHeader) +
-			                      sizeof(H265NalUnitFragmentHeader),
-			                  pkt->end());
-		} else if (nalUnitHeader.unitType() == naluTypeAP) {
-			auto currOffset = rtpHeaderSize + sizeof(H265NalUnitHeader);
+			// RFC 7798: If an FU is lost, the receiver SHOULD discard all following fragmentation
+			// units in transmission order corresponding to the same fragmented NAL unit
+			if (continuousFragments) {
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize + 3,
+				             packet->end() - paddingSize);
+			}
 
-			while (currOffset + sizeof(uint16_t) < pkt->size()) {
-				auto naluSize = std::to_integer<uint16_t>(pkt->at(currOffset)) << 8 |
-				                std::to_integer<uint16_t>(pkt->at(currOffset + 1));
+			// RFC 7798: When set to 1, the E bit indicates the end of a fragmented NAL unit, i.e.,
+			// the last byte of the payload is also the last byte of the fragmented NAL unit.  When
+			// the FU payload is not the last fragment of a fragmented NAL unit, the E bit MUST be
+			// set to 0.
+			if (nalUnitFragmentHeader.isEnd())
+				continuousFragments = false;
 
-				currOffset += sizeof(uint16_t);
+		} else {
+			continuousFragments = false;
 
-				if (pkt->size() < currOffset + naluSize) {
-					throw std::runtime_error("H265 AP declared size is larger than buffer");
-				}
+			if (nalUnitHeader.unitType() == naluTypeAP) {
+				auto offset = rtpHeaderSize + 2;
 
-				addSeparator(accessUnit);
-				accessUnit.insert(accessUnit.end(), pkt->begin() + currOffset,
-				                  pkt->begin() + currOffset + naluSize);
+				while (offset + 2 < packet->size() - paddingSize) {
+					auto naluSize = std::to_integer<uint16_t>(packet->at(offset)) << 8 |
+					                std::to_integer<uint16_t>(packet->at(offset + 1));
 
-				currOffset += naluSize;
-			}
-		} else if (nalUnitHeader.unitType() < naluTypeAP) {
-			// "NAL units with NAL unit type values in the range of 0 to 47, inclusive, may be
-			// passed to the decoder."
-			addSeparator(accessUnit);
-			accessUnit.insert(accessUnit.end(), pkt->begin() + rtpHeaderSize, pkt->end());
-		} else {
-			// "NAL-unit-like structures with NAL unit type values in the range of 48 to 63,
-			// inclusive, MUST NOT be passed to the decoder."
-		}
-	}
+					offset += 2;
 
-	if (!accessUnit.empty()) {
-		auto frameInfo = std::make_shared<FrameInfo>(timestamp);
-		frameInfo->timestampSeconds = std::chrono::duration<double>(double(timestamp) / double(ClockRate));
-		frameInfo->payloadType = payloadType;
-		out.emplace_back(make_message(std::move(accessUnit), frameInfo));
-	}
+					if (offset + naluSize > packet->size() - paddingSize)
+						throw std::runtime_error("H265 STAP size is larger than payload");
 
-	return out;
-}
+					addSeparator(frame);
+					frame.insert(frame.end(), packet->begin() + offset,
+					             packet->begin() + offset + naluSize);
 
-void H265RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
-	messages.erase(std::remove_if(messages.begin(), messages.end(),
-	                              [&](message_ptr message) {
-		                              if (message->type == Message::Control) {
-			                              return false;
-		                              }
-
-		                              if (message->size() < sizeof(RtpHeader)) {
-			                              PLOG_VERBOSE << "RTP packet is too small, size="
-			                                           << message->size();
-			                              return true;
-		                              }
-
-		                              mRtpBuffer.push_back(std::move(message));
-		                              return true;
-	                              }),
-	               messages.end());
-
-	while (mRtpBuffer.size() != 0) {
-		uint8_t payload_type = 0;
-		uint32_t current_timestamp = 0;
-		size_t packets_in_timestamp = 0;
-
-		for (const auto &pkt : mRtpBuffer) {
-			auto p = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-
-			if (current_timestamp == 0) {
-				current_timestamp = p->timestamp();
-				payload_type = p->payloadType(); // should all be the same for data of the same codec
-			} else if (current_timestamp != p->timestamp()) {
-				break;
-			}
+					offset += naluSize;
+				}
 
-			packets_in_timestamp++;
-		}
+			} else if (nalUnitHeader.unitType() < 47) {
+				// RFC 7798: NAL units with NAL unit type values in the range of 0 to 47, inclusive,
+				// may be passed to the decoder.
+				addSeparator(frame);
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize,
+				             packet->end() - paddingSize);
 
-		if (packets_in_timestamp == mRtpBuffer.size()) {
-			break;
+			} else {
+				// RFC 7798: NAL-unit-like structures with NAL unit type values in the range of 48
+				// to 63, inclusive, MUST NOT be passed to the decoder.
+			}
 		}
+	}
 
-		auto begin = mRtpBuffer.begin();
-		auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);
+	return make_message(std::move(frame), createFrameInfo(timestamp, payloadType));
+}
 
-		auto frames = buildFrames(begin, end + 1, payload_type, current_timestamp);
-		messages.insert(messages.end(), frames.begin(), frames.end());
-		mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
+void H265RtpDepacketizer::addSeparator(binary &frame) {
+	switch (mSeparator) {
+	case Separator::StartSequence:
+		[[fallthrough]];
+	case Separator::LongStartSequence:
+		frame.insert(frame.end(), naluLongStartCode.begin(), naluLongStartCode.end());
+		break;
+	case Separator::ShortStartSequence:
+		frame.insert(frame.end(), naluShortStartCode.begin(), naluShortStartCode.end());
+		break;
+	default:
+		throw std::invalid_argument("Invalid separator");
 	}
 }
 

+ 65 - 11
src/rtpdepacketizer.cpp

@@ -13,9 +13,6 @@
 
 #include "impl/logcounter.hpp"
 
-#include <cmath>
-#include <cstring>
-
 namespace rtc {
 
 RtpDepacketizer::RtpDepacketizer() : mClockRate(0) {}
@@ -24,7 +21,7 @@ RtpDepacketizer::RtpDepacketizer(uint32_t clockRate) : mClockRate(clockRate) {}
 
 RtpDepacketizer::~RtpDepacketizer() {}
 
-void RtpDepacketizer::incoming([[maybe_unused]] message_vector &messages,
+void RtpDepacketizer::incoming(message_vector &messages,
                                [[maybe_unused]] const message_callback &send) {
 	message_vector result;
 	for (auto &message : messages) {
@@ -40,18 +37,75 @@ void RtpDepacketizer::incoming([[maybe_unused]] message_vector &messages,
 
 		auto pkt = reinterpret_cast<const rtc::RtpHeader *>(message->data());
 		auto headerSize = sizeof(rtc::RtpHeader) + pkt->csrcCount() + pkt->getExtensionHeaderSize();
-
-		auto frameInfo = std::make_shared<FrameInfo>(pkt->timestamp());
-		if (mClockRate > 0)
-			frameInfo->timestampSeconds =
-			    std::chrono::duration<double>(double(pkt->timestamp()) / double(mClockRate));
-		frameInfo->payloadType = pkt->payloadType();
-		result.push_back(make_message(message->begin() + headerSize, message->end(), frameInfo));
+		result.push_back(make_message(message->begin() + headerSize, message->end(),
+		                              createFrameInfo(pkt->timestamp(), pkt->payloadType())));
 	}
 
 	messages.swap(result);
 }
 
+shared_ptr<FrameInfo> RtpDepacketizer::createFrameInfo(uint32_t timestamp,
+                                                       uint8_t payloadType) const {
+	auto frameInfo = std::make_shared<FrameInfo>(timestamp);
+	if (mClockRate > 0)
+		frameInfo->timestampSeconds =
+		    std::chrono::duration<double>(double(timestamp) / double(mClockRate));
+	frameInfo->payloadType = payloadType;
+	return frameInfo;
+}
+
+VideoRtpDepacketizer::VideoRtpDepacketizer() : RtpDepacketizer(ClockRate) {}
+
+VideoRtpDepacketizer::~VideoRtpDepacketizer() {}
+
+void VideoRtpDepacketizer::incoming(message_vector &messages,
+                                    [[maybe_unused]] const message_callback &send) {
+	message_vector result;
+	for (auto message : messages) {
+		if (message->type == Message::Control) {
+			result.push_back(std::move(message));
+			continue;
+		}
+
+		if (message->size() < sizeof(RtpHeader)) {
+			PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
+			continue;
+		}
+
+		auto header = reinterpret_cast<const RtpHeader *>(message->data());
+
+		if (!mBuffer.empty()) {
+			auto first = *mBuffer.begin();
+			auto firstHeader = reinterpret_cast<const RtpHeader *>(first->data());
+			if (firstHeader->timestamp() != header->timestamp()) {
+				if (auto frame = reassemble(mBuffer))
+					result.push_back(frame);
+
+				mBuffer.clear();
+			}
+		}
+
+		mBuffer.insert(std::move(message));
+
+		if (header->marker()) {
+			if (auto frame = reassemble(mBuffer))
+				result.push_back(std::move(frame));
+
+			mBuffer.clear();
+		}
+	};
+
+	messages.swap(result);
+}
+
+bool VideoRtpDepacketizer::sequence_cmp::operator()(message_ptr a, message_ptr b) const {
+	assert(a->size() >= sizeof(RtpHeader) && b->size() >= sizeof(RtpHeader));
+	auto ha = reinterpret_cast<const rtc::RtpHeader *>(a->data());
+	auto hb = reinterpret_cast<const rtc::RtpHeader *>(b->data());
+	int16_t d = int16_t(hb->seqNumber() - ha->seqNumber());
+	return d > 0;
+}
+
 } // namespace rtc
 
 #endif /* RTC_ENABLE_MEDIA */