Преглед изворни кода

Rewrite H264 RTP depacketization logic

Paul-Louis Ageneau пре 2 месеци
родитељ
комит
c115295cbf
2 измењених фајлова са 129 додато и 99 уклоњено
  1. 8 5
      include/rtc/h264rtpdepacketizer.hpp
  2. 121 94
      src/h264rtpdepacketizer.cpp

+ 8 - 5
include/rtc/h264rtpdepacketizer.hpp

@@ -18,7 +18,7 @@
 #include "nalunit.hpp"
 #include "rtp.hpp"
 
-#include <iterator>
+#include <set>
 
 namespace rtc {
 
@@ -35,12 +35,15 @@ public:
 	void incoming(message_vector &messages, const message_callback &send) override;
 
 private:
-	std::vector<message_ptr> mRtpBuffer;
+	void addSeparator(binary &accessUnit);
+	message_ptr buildFrame();
+
 	const NalUnit::Separator mSeparator;
 
-	void addSeparator(binary &accessUnit);
-	message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
-	                           uint8_t payloadType, uint32_t timestamp);
+	struct sequence_cmp {
+		bool operator() (message_ptr a, message_ptr b) const;
+    };
+	std::set<message_ptr, sequence_cmp> mBuffer;
 };
 
 } // namespace rtc

+ 121 - 94
src/h264rtpdepacketizer.cpp

@@ -14,6 +14,7 @@
 #include "impl/internals.hpp"
 
 #include <algorithm>
+#include <cassert>
 #include <chrono>
 
 namespace rtc {
@@ -24,6 +25,14 @@ const binary naluShortStartCode = {byte{0}, byte{0}, byte{1}};
 const uint8_t naluTypeSTAPA = 24;
 const uint8_t naluTypeFUA = 28;
 
+bool H264RtpDepacketizer::sequence_cmp::operator()(message_ptr a, message_ptr b) const {
+	assert(a->size() >= sizeof(RtpHeader) && b->size() >= sizeof(RtpHeader));
+	auto ha = reinterpret_cast<const rtc::RtpHeader *>(a->data());
+	auto hb = reinterpret_cast<const rtc::RtpHeader *>(b->data());
+	int16_t d = int16_t(hb->seqNumber() - ha->seqNumber());
+	return d > 0;
+}
+
 H264RtpDepacketizer::H264RtpDepacketizer(Separator separator) : mSeparator(separator) {
 	if (separator != Separator::StartSequence && separator != Separator::LongStartSequence &&
 	    separator != Separator::ShortStartSequence) {
@@ -41,131 +50,149 @@ void H264RtpDepacketizer::addSeparator(binary &accessUnit) {
 	}
 }
 
-message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
-                                                message_vector::iterator end, uint8_t payloadType,
-                                                uint32_t timestamp) {
-	message_vector out;
-	binary accessUnit;
+message_ptr H264RtpDepacketizer::buildFrame() {
+	if (mBuffer.empty())
+		return nullptr;
+
+	auto first = *mBuffer.begin();
+	auto firstRtpHeader = reinterpret_cast<const RtpHeader *>(first->data());
+	uint8_t payloadType = firstRtpHeader->payloadType();
+	uint32_t timestamp = firstRtpHeader->timestamp();
+	uint16_t nextSeqNumber = firstRtpHeader->seqNumber();
+
+	binary frame;
+	bool continuousFragments = false;
+	for (const auto &packet : mBuffer) {
+		auto rtpHeader = reinterpret_cast<const rtc::RtpHeader *>(packet->data());
+		if (rtpHeader->seqNumber() < nextSeqNumber) {
+			// Skip
+			continue; // skip
+		}
+		if (rtpHeader->seqNumber() > nextSeqNumber) {
+			// Missing packet(s)
+			continuousFragments = false;
+		}
 
-	for (auto it = begin; it != end; ++it) {
-		auto pkt = it->get();
-		auto pktParsed = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-		auto rtpHeaderSize = pktParsed->getSize() + pktParsed->getExtensionHeaderSize();
-		auto rtpPaddingSize = 0;
+		nextSeqNumber = rtpHeader->seqNumber() + 1;
 
-		if (pktParsed->padding()) {
-			rtpPaddingSize = std::to_integer<uint8_t>(pkt->at(pkt->size() - 1));
-		}
+		auto rtpHeaderSize = rtpHeader->getSize() + rtpHeader->getExtensionHeaderSize();
+		auto paddingSize = 0;
+		if (rtpHeader->padding())
+			paddingSize = std::to_integer<uint8_t>(packet->back());
 
-		if (pkt->size() == rtpHeaderSize + rtpPaddingSize) {
-			PLOG_VERBOSE << "H.264 RTP packet has empty payload";
-			continue;
-		}
+		if (packet->size() <= rtpHeaderSize + paddingSize)
+			continue; // Empty payload
 
-		auto nalUnitHeader = NalUnitHeader{std::to_integer<uint8_t>(pkt->at(rtpHeaderSize))};
+		auto nalUnitHeader = NalUnitHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize))};
 
 		if (nalUnitHeader.unitType() == naluTypeFUA) {
-			auto nalUnitFragmentHeader = NalUnitFragmentHeader{
-			    std::to_integer<uint8_t>(pkt->at(rtpHeaderSize + sizeof(NalUnitHeader)))};
+			if (packet->size() <= rtpHeaderSize + paddingSize + 1)
+				continue; // Empty FU-A
+
+			auto nalUnitFragmentHeader =
+			    NalUnitFragmentHeader{std::to_integer<uint8_t>(packet->at(rtpHeaderSize + 1))};
 
 			// RFC 6184: When set to one, the Start bit indicates the start of a fragmented NAL
 			// unit. When the following FU payload is not the start of a fragmented NAL unit
 			// payload, the Start bit is set to zero.
-			if (nalUnitFragmentHeader.isStart() || accessUnit.empty()) {
-				addSeparator(accessUnit);
-				accessUnit.emplace_back(
-				    byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType()));
+			if (nalUnitFragmentHeader.isStart()) {
+				addSeparator(frame);
+				frame.emplace_back(byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType()));
+				continuousFragments = true;
 			}
 
-			accessUnit.insert(accessUnit.end(),
-			                  pkt->begin() + rtpHeaderSize + sizeof(NalUnitHeader) +
-			                      sizeof(NalUnitFragmentHeader),
-			                  pkt->end());
-		} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
-			addSeparator(accessUnit);
-			accessUnit.insert(accessUnit.end(), pkt->begin() + rtpHeaderSize, pkt->end());
-		} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
-			auto currOffset = rtpHeaderSize + sizeof(NalUnitHeader);
+			// RFC 6184: If a fragmentation unit is lost, the receiver SHOULD discard all following
+			// fragmentation units in transmission order corresponding to the same fragmented NAL
+			// unit.
+			if (continuousFragments) {
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize + 2,
+				             packet->end() - paddingSize);
+			}
+
+			// RFC 6184: When set to one, the End bit indicates the end of a fragmented NAL unit,
+			// i.e., the last byte of the payload is also the last byte of the fragmented NAL unit.
+			// When the following FU payload is not the last fragment of a fragmented NAL unit, the
+			// End bit is set to zero.
+			if (nalUnitFragmentHeader.isEnd())
+				continuousFragments = false;
+
+		} else {
+			continuousFragments = false;
+
+			if (nalUnitHeader.unitType() == naluTypeSTAPA) {
+				auto offset = rtpHeaderSize + 1;
+
+				while (offset + 2 < packet->size() - paddingSize) {
+					auto naluSize = std::to_integer<uint16_t>(packet->at(offset)) << 8 |
+					                std::to_integer<uint16_t>(packet->at(offset + 1));
+
+					offset += 2;
 
-			while (currOffset + sizeof(uint16_t) < pkt->size()) {
-				auto naluSize = std::to_integer<uint16_t>(pkt->at(currOffset)) << 8 |
-				                std::to_integer<uint16_t>(pkt->at(currOffset + 1));
+					if (offset + naluSize > packet->size() - paddingSize)
+						throw std::runtime_error("H264 STAP-A size is larger than payload");
 
-				currOffset += sizeof(uint16_t);
+					addSeparator(frame);
+					frame.insert(frame.end(), packet->begin() + offset,
+					             packet->begin() + offset + naluSize);
 
-				if (pkt->size() < currOffset + naluSize) {
-					throw std::runtime_error("H264 STAP-A declared size is larger than buffer");
+					offset += naluSize;
 				}
 
-				addSeparator(accessUnit);
-				accessUnit.insert(accessUnit.end(), pkt->begin() + currOffset,
-				                  pkt->begin() + currOffset + naluSize);
+			} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
+				addSeparator(frame);
+				frame.insert(frame.end(), packet->begin() + rtpHeaderSize,
+				             packet->end() - paddingSize);
 
-				currOffset += naluSize;
+			} else {
+				throw std::runtime_error("Unknown H264 RTP Packetization");
 			}
-		} else {
-			throw std::runtime_error("Unknown H264 RTP Packetization");
 		}
 	}
 
-	if (!accessUnit.empty()) {
-		auto frameInfo = std::make_shared<FrameInfo>(timestamp);
-		frameInfo->timestampSeconds = std::chrono::duration<double>(double(timestamp) / double(ClockRate));
-		frameInfo->payloadType = payloadType;
-		out.emplace_back(make_message(std::move(accessUnit), frameInfo));
-	}
-
-	return out;
+	auto frameInfo = std::make_shared<FrameInfo>(timestamp);
+	frameInfo->timestampSeconds =
+	    std::chrono::duration<double>(double(timestamp) / double(ClockRate));
+	frameInfo->payloadType = payloadType;
+	return make_message(std::move(frame), std::move(frameInfo));
 }
 
 void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
-	messages.erase(std::remove_if(messages.begin(), messages.end(),
-	                              [&](message_ptr message) {
-		                              if (message->type == Message::Control) {
-			                              return false;
-		                              }
-
-		                              if (message->size() < sizeof(RtpHeader)) {
-			                              PLOG_VERBOSE << "RTP packet is too small, size="
-			                                           << message->size();
-			                              return true;
-		                              }
-
-		                              mRtpBuffer.push_back(std::move(message));
-		                              return true;
-	                              }),
-	               messages.end());
-
-	while (mRtpBuffer.size() != 0) {
-		uint8_t payload_type = 0;
-		uint32_t current_timestamp = 0;
-		size_t packets_in_timestamp = 0;
-
-		for (const auto &pkt : mRtpBuffer) {
-			auto p = reinterpret_cast<const rtc::RtpHeader *>(pkt->data());
-
-			if (current_timestamp == 0) {
-				current_timestamp = p->timestamp();
-				payload_type =
-				    p->payloadType(); // should all be the same for data of the same codec
-			} else if (current_timestamp != p->timestamp()) {
-				break;
-			}
+	message_vector result;
+	for (auto message : messages) {
+		if (message->type == Message::Control) {
+			result.push_back(std::move(message));
+			continue;
+		}
 
-			packets_in_timestamp++;
+		if (message->size() < sizeof(RtpHeader)) {
+			PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
+			continue;
 		}
 
-		if (packets_in_timestamp == mRtpBuffer.size()) {
-			break;
+		auto header = reinterpret_cast<const RtpHeader *>(message->data());
+
+		if (!mBuffer.empty()) {
+			auto first = *mBuffer.begin();
+			auto firstHeader = reinterpret_cast<const RtpHeader *>(first->data());
+			if (firstHeader->timestamp() != header->timestamp()) {
+				if (auto frame = buildFrame())
+					result.push_back(frame);
+
+				mBuffer.clear();
+			}
 		}
 
-		auto begin = mRtpBuffer.begin();
-		auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);
+		mBuffer.insert(std::move(message));
 
-		auto frames = buildFrames(begin, end + 1, payload_type, current_timestamp);
-		messages.insert(messages.end(), frames.begin(), frames.end());
-		mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
-	}
+		if (header->marker()) {
+			if (auto frame = buildFrame())
+				result.push_back(std::move(frame));
+
+			mBuffer.clear();
+		}
+	};
+
+	messages.swap(result);
 }
 
 } // namespace rtc