Browse Source

Add FrameInfo

Depacketizers will include metadata about assembled media.
Sean DuBois 1 year ago
parent
commit
9d6671c936

+ 1 - 0
CMakeLists.txt

@@ -99,6 +99,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/common.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/global.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/message.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/frameinfo.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/peerconnection.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h

+ 23 - 0
include/rtc/frameinfo.hpp

@@ -0,0 +1,23 @@
+/**
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef RTC_FRAMEINFO_H
+#define RTC_FRAMEINFO_H
+
+#include "common.hpp"
+
+namespace rtc {
+
+struct RTC_CPP_EXPORT FrameInfo {
+	FrameInfo(uint32_t timestamp) : timestamp(timestamp){};
+	uint32_t timestamp = 0; // RTP Timestamp
+};
+
+} // namespace rtc
+
+#endif // RTC_FRAMEINFO_H

+ 2 - 1
include/rtc/h264rtpdepacketizer.hpp

@@ -32,7 +32,8 @@ public:
 private:
 	std::vector<message_ptr> mRtpBuffer;
 
-	message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt);
+	message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
+	                           uint32_t timestamp);
 };
 
 } // namespace rtc

+ 7 - 2
include/rtc/message.hpp

@@ -10,6 +10,7 @@
 #define RTC_MESSAGE_H
 
 #include "common.hpp"
+#include "frameinfo.hpp"
 #include "reliability.hpp"
 
 #include <functional>
@@ -32,6 +33,7 @@ struct RTC_CPP_EXPORT Message : binary {
 	unsigned int stream = 0; // Stream id (SCTP stream or SSRC)
 	unsigned int dscp = 0;   // Differentiated Services Code Point
 	shared_ptr<Reliability> reliability;
+	shared_ptr<FrameInfo> frameInfo;
 };
 
 using message_ptr = shared_ptr<Message>;
@@ -44,10 +46,12 @@ inline size_t message_size_func(const message_ptr &m) {
 
 template <typename Iterator>
 message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary,
-                         unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr) {
+                         unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr,
+                         shared_ptr<FrameInfo> frameInfo = nullptr) {
 	auto message = std::make_shared<Message>(begin, end, type);
 	message->stream = stream;
 	message->reliability = reliability;
+	message->frameInfo = frameInfo;
 	return message;
 }
 
@@ -57,7 +61,8 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag
 
 RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
                                         unsigned int stream = 0,
-                                        shared_ptr<Reliability> reliability = nullptr);
+                                        shared_ptr<Reliability> reliability = nullptr,
+                                        shared_ptr<FrameInfo> frameInfo = nullptr);
 
 RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig);
 

+ 2 - 0
include/rtc/track.hpp

@@ -41,6 +41,8 @@ public:
 	bool isClosed(void) const override;
 	size_t maxMessageSize() const override;
 
+	void onFrame(std::function<void(binary data, FrameInfo frame)> callback);
+
 	bool requestKeyframe();
 	bool requestBitrate(unsigned int bitrate);
 

+ 26 - 21
src/h264rtpdepacketizer.cpp

@@ -32,9 +32,10 @@ const uint8_t naluTypeSTAPA = 24;
 const uint8_t naluTypeFUA = 28;
 
 message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
-                                                message_vector::iterator end) {
+                                                message_vector::iterator end, uint32_t timestamp) {
 	message_vector out = {};
 	auto fua_buffer = std::vector<std::byte>{};
+	auto frameInfo = std::make_shared<FrameInfo>(timestamp);
 
 	for (auto it = begin; it != end; it++) {
 		auto pkt = it->get();
@@ -58,11 +59,13 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
 				fua_buffer.at(0) =
 				    std::byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType());
 
-				out.push_back(make_message(std::move(fua_buffer)));
+				out.push_back(
+				    make_message(std::move(fua_buffer), Message::Binary, 0, nullptr, frameInfo));
 				fua_buffer.clear();
 			}
 		} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
-			out.push_back(make_message(pkt->begin() + headerSize, pkt->end()));
+			out.push_back(make_message(pkt->begin() + headerSize, pkt->end(), Message::Binary, 0,
+			                           nullptr, frameInfo));
 		} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
 			auto currOffset = stapaHeaderSize + headerSize;
 
@@ -76,11 +79,11 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
 					throw std::runtime_error("STAP-A declared size is larger then buffer");
 				}
 
-				out.push_back(
-				    make_message(pkt->begin() + currOffset, pkt->begin() + currOffset + naluSize));
+				out.push_back(make_message(pkt->begin() + currOffset,
+				                           pkt->begin() + currOffset + naluSize, Message::Binary, 0,
+				                           nullptr, frameInfo));
 				currOffset += naluSize;
 			}
-
 		} else {
 			throw std::runtime_error("Unknown H264 RTP Packetization");
 		}
@@ -90,20 +93,22 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
 }
 
 void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
-	for (auto message : messages) {
-		if (message->type == Message::Control) {
-			continue; // RTCP
-		}
-
-		if (message->size() < sizeof(RtpHeader)) {
-			PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
-			continue;
-		}
-
-		mRtpBuffer.push_back(message);
-	}
-
-	messages.clear();
+	messages.erase(std::remove_if(messages.begin(), messages.end(),
+	                              [&](message_ptr message) {
+		                              if (message->type == Message::Control) {
+			                              return false;
+		                              }
+
+		                              if (message->size() < sizeof(RtpHeader)) {
+			                              PLOG_VERBOSE << "RTP packet is too small, size="
+			                                           << message->size();
+			                              return true;
+		                              }
+
+		                              mRtpBuffer.push_back(std::move(message));
+		                              return true;
+	                              }),
+	               messages.end());
 
 	while (mRtpBuffer.size() != 0) {
 		uint32_t current_timestamp = 0;
@@ -128,7 +133,7 @@ void H264RtpDepacketizer::incoming(message_vector &messages, const message_callb
 		auto begin = mRtpBuffer.begin();
 		auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);
 
-		auto frames = buildFrames(begin, end + 1);
+		auto frames = buildFrames(begin, end + 1, current_timestamp);
 		messages.insert(messages.end(), frames.begin(), frames.end());
 		mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
 	}

+ 2 - 2
src/impl/channel.hpp

@@ -28,7 +28,7 @@ struct Channel {
 	virtual void triggerAvailable(size_t count);
 	virtual void triggerBufferedAmount(size_t amount);
 
-	void flushPendingMessages();
+	virtual void flushPendingMessages();
 	void resetOpenCallback();
 	void resetCallbacks();
 
@@ -43,7 +43,7 @@ struct Channel {
 	std::atomic<size_t> bufferedAmount = 0;
 	std::atomic<size_t> bufferedAmountLowThreshold = 0;
 
-private:
+protected:
 	std::atomic<bool> mOpenTriggered = false;
 };
 

+ 37 - 11
src/impl/track.cpp

@@ -75,24 +75,23 @@ void Track::close() {
 	resetCallbacks();
 }
 
+message_variant Track::trackMessageToVariant(message_ptr message) {
+	if (message->type == Message::Control)
+		return to_variant(*message); // The same message may be frowarded into multiple Tracks
+	else
+		return to_variant(std::move(*message));
+}
+
 optional<message_variant> Track::receive() {
 	if (auto next = mRecvQueue.pop()) {
-		message_ptr message = *next;
-		if (message->type == Message::Control)
-			return to_variant(**next); // The same message may be frowarded into multiple Tracks
-		else
-			return to_variant(std::move(*message));
+		return trackMessageToVariant(*next);
 	}
 	return nullopt;
 }
 
 optional<message_variant> Track::peek() {
 	if (auto next = mRecvQueue.peek()) {
-		message_ptr message = *next;
-		if (message->type == Message::Control)
-			return to_variant(**next); // The same message may be forwarded into multiple Tracks
-		else
-			return to_variant(std::move(*message));
+		return trackMessageToVariant(*next);
 	}
 	return nullopt;
 }
@@ -217,7 +216,7 @@ void Track::setMediaHandler(shared_ptr<MediaHandler> handler) {
 		mMediaHandler = handler;
 	}
 
-	if(handler)
+	if (handler)
 		handler->media(description());
 }
 
@@ -226,4 +225,31 @@ shared_ptr<MediaHandler> Track::getMediaHandler() {
 	return mMediaHandler;
 }
 
+void Track::onFrame(std::function<void(binary data, FrameInfo frame)> callback) {
+	frameCallback = callback;
+	flushPendingMessages();
+}
+
+void Track::flushPendingMessages() {
+	if (!mOpenTriggered)
+		return;
+
+	while (messageCallback || frameCallback) {
+		auto next = mRecvQueue.pop();
+		if (!next)
+			break;
+
+		auto message = next.value();
+		try {
+			if (message->frameInfo != nullptr && frameCallback) {
+				frameCallback(std::move(*message), std::move(*message->frameInfo));
+			} else if (message->frameInfo == nullptr && messageCallback) {
+				messageCallback(trackMessageToVariant(message));
+			}
+		} catch (const std::exception &e) {
+			PLOG_WARNING << "Uncaught exception in callback: " << e.what();
+		}
+	}
+}
+
 } // namespace rtc::impl

+ 6 - 0
src/impl/track.hpp

@@ -38,6 +38,10 @@ public:
 	optional<message_variant> receive() override;
 	optional<message_variant> peek() override;
 	size_t availableAmount() const override;
+	void flushPendingMessages() override;
+	message_variant trackMessageToVariant(message_ptr message);
+
+	void onFrame(std::function<void(binary data, FrameInfo frame)> callback);
 
 	bool isOpen() const;
 	bool isClosed() const;
@@ -71,6 +75,8 @@ private:
 	std::atomic<bool> mIsClosed = false;
 
 	Queue<message_ptr> mRecvQueue;
+
+	synchronized_callback<binary, FrameInfo> frameCallback;
 };
 
 } // namespace rtc::impl

+ 3 - 2
src/message.cpp

@@ -19,15 +19,16 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream,
 }
 
 message_ptr make_message(binary &&data, Message::Type type, unsigned int stream,
-                         shared_ptr<Reliability> reliability) {
+                         shared_ptr<Reliability> reliability, shared_ptr<FrameInfo> frameInfo) {
 	auto message = std::make_shared<Message>(std::move(data), type);
 	message->stream = stream;
 	message->reliability = reliability;
+	message->frameInfo = frameInfo;
 	return message;
 }
 
 message_ptr make_message(size_t size, message_ptr orig) {
-	if(!orig)
+	if (!orig)
 		return nullptr;
 
 	auto message = std::make_shared<Message>(size, orig->type);

+ 4 - 0
src/track.cpp

@@ -70,4 +70,8 @@ bool Track::requestBitrate(unsigned int bitrate) {
 
 shared_ptr<MediaHandler> Track::getMediaHandler() { return impl()->getMediaHandler(); }
 
+void Track::onFrame(std::function<void(binary data, FrameInfo frame)> callback) {
+	impl()->onFrame(callback);
+}
+
 } // namespace rtc