Browse Source

Merge pull request #318 from in2core/feature/chainable-rtcp-handlers

Chaining of multiple RTP/RTCP handlers
Paul-Louis Ageneau 4 years ago
parent
commit
882c605876

+ 11 - 3
CMakeLists.txt

@@ -67,13 +67,17 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizationconfig.cpp
-	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsenderreporter.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsrreporter.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/opusrtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/opuspacketizationhandler.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/h264rtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/nalunit.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/h264packetizationhandler.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediachainablehandler.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerelement.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerrootelement.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpnackresponder.cpp
 )
 
 set(LIBDATACHANNEL_WEBSOCKET_SOURCES
@@ -92,7 +96,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/configuration.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/datachannel.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/description.hpp
-	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcphandler.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediahandler.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpreceivingsession.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/include.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/init.hpp
@@ -107,13 +111,17 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizationconfig.hpp
-	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsenderreporter.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsrreporter.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opusrtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opuspacketizationhandler.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264rtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/nalunit.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264packetizationhandler.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediachainablehandler.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerelement.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerrootelement.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpnackresponder.hpp
 )
 
 set(TESTS_SOURCES

+ 1 - 1
examples/streamer/helpers.cpp

@@ -60,7 +60,7 @@ int gettimeofday(struct timeval *tv, struct timezone *tz)
 using namespace std;
 using namespace rtc;
 
-ClientTrackData::ClientTrackData(shared_ptr<Track> track, shared_ptr<RtcpSenderReporter> sender) {
+ClientTrackData::ClientTrackData(shared_ptr<Track> track, shared_ptr<RtcpSrReporter> sender) {
     this->track = track;
     this->sender = sender;
 }

+ 2 - 2
examples/streamer/helpers.hpp

@@ -23,9 +23,9 @@
 
 struct ClientTrackData {
     std::shared_ptr<rtc::Track> track;
-    std::shared_ptr<rtc::RtcpSenderReporter> sender;
+	std::shared_ptr<rtc::RtcpSrReporter> sender;
 
-    ClientTrackData(std::shared_ptr<rtc::Track> track, std::shared_ptr<rtc::RtcpSenderReporter> sender);
+	ClientTrackData(std::shared_ptr<rtc::Track> track, std::shared_ptr<rtc::RtcpSrReporter> sender);
 };
 
 struct Client {

+ 18 - 6
examples/streamer/main.cpp

@@ -219,13 +219,19 @@ shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const
     // create RTP configuration
     auto rtpConfig = shared_ptr<RtpPacketizationConfig>(new RtpPacketizationConfig(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate));
     // create packetizer
-    auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig));
-    // create H264 and RTCP SP handler
-    shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer));
+	auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(H264RtpPacketizer::Separator::Length, rtpConfig));
+	// create H264 handler
+	shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(packetizer));
+	// add RTCP SR handler
+	auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
+	h264Handler->addToChain(srReporter);
+	// add RTCP NACK handler
+	auto nackResponder = make_shared<RtcpNackResponder>();
+	h264Handler->addToChain(nackResponder);
     // set handler
     track->setRtcpHandler(h264Handler);
     track->onOpen(onOpen);
-    auto trackData = make_shared<ClientTrackData>(track, h264Handler);
+    auto trackData = make_shared<ClientTrackData>(track, srReporter);
     return trackData;
 }
 
@@ -238,12 +244,18 @@ shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const
     auto rtpConfig = shared_ptr<RtpPacketizationConfig>(new RtpPacketizationConfig(ssrc, cname, payloadType, OpusRtpPacketizer::defaultClockRate));
     // create packetizer
     auto packetizer = make_shared<OpusRtpPacketizer>(rtpConfig);
-    // create opus and RTCP SP handler
+	// create opus handler
     auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
+	// add RTCP SR handler
+	auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
+	opusHandler->addToChain(srReporter);
+	// add RTCP NACK handler
+	auto nackResponder = make_shared<RtcpNackResponder>();
+	opusHandler->addToChain(nackResponder);
     // set handler
     track->setRtcpHandler(opusHandler);
     track->onOpen(onOpen);
-    auto trackData = make_shared<ClientTrackData>(track, opusHandler);
+    auto trackData = make_shared<ClientTrackData>(track, srReporter);
     return trackData;
 }
 

+ 3 - 37
include/rtc/h264packetizationhandler.hpp

@@ -23,50 +23,16 @@
 
 #include "h264rtppacketizer.hpp"
 #include "nalunit.hpp"
-#include "rtcphandler.hpp"
-#include "rtcpsenderreporter.hpp"
+#include "mediachainablehandler.hpp"
 
 namespace rtc {
 
 /// Handler for H264 packetization
-class RTC_CPP_EXPORT H264PacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
-	/// RTP packetizer for H264
-	const std::shared_ptr<H264RtpPacketizer> packetizer;
-
-	const uint16_t maximumFragmentSize;
-
-	std::shared_ptr<NalUnits> splitMessage(message_ptr message);
-
+class RTC_CPP_EXPORT H264PacketizationHandler : public MediaChainableHandler {
 public:
-	/// Nalunit separator
-	enum class Separator {
-		LongStartSequence,  // 0x00, 0x00, 0x00, 0x01
-		ShortStartSequence, // 0x00, 0x00, 0x01
-		StartSequence,      // LongStartSequence or ShortStartSequence
-		Length              // first 4 bytes is nal unit length
-	};
-
 	/// Construct handler for H264 packetization.
-	/// @param separator Nal units separator
 	/// @param packetizer RTP packetizer for h264
-	H264PacketizationHandler(Separator separator, std::shared_ptr<H264RtpPacketizer> packetizer,
-	                         uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
-
-	/// Returns message unchanged
-	/// @param ptr message
-	message_ptr incoming(message_ptr ptr) override;
-
-	/// Returns packetized message if message type is binary
-	/// @note NAL units in `ptr` message must be separated by `separator` given in constructor
-	/// @note If message generates multiple rtp packets, all but last are send using
-	/// `outgoingCallback`. It is your responsibility to send last packet.
-	/// @param ptr message containing all NAL units for current timestamp (one sample)
-	/// @return last packet
-	message_ptr outgoing(message_ptr ptr) override;
-
-private:
-	/// Separator
-	const Separator separator;
+	H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer);
 };
 
 } // namespace rtc

+ 23 - 2
include/rtc/h264rtppacketizer.hpp

@@ -21,22 +21,43 @@
 
 #if RTC_ENABLE_MEDIA
 
+#include "nalunit.hpp"
 #include "rtppacketizer.hpp"
+#include "mediahandlerrootelement.hpp"
 
 namespace rtc {
 
 /// RTP packetization of h264 payload
-class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer {
+class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer, public MediaHandlerRootElement {
+	std::shared_ptr<NalUnits> splitMessage(binary_ptr message);
+	const uint16_t maximumFragmentSize;
 
 public:
 	/// Default clock rate for H264 in RTP
 	static const auto defaultClockRate = 90 * 1000;
 
+	/// Nalunit separator
+	enum class Separator {
+		LongStartSequence,  // 0x00, 0x00, 0x00, 0x01
+		ShortStartSequence, // 0x00, 0x00, 0x01
+		StartSequence,      // LongStartSequence or ShortStartSequence
+		Length              // first 4 bytes is nal unit length
+	};
+
+	H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+					  uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+
 	/// Constructs h264 payload packetizer with given RTP configuration.
 	/// @note RTP configuration is used in packetization process which may change some configuration
 	/// properties such as sequence number.
 	/// @param rtpConfig  RTP configuration
-	H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+	/// @param maximumFragmentSize maximum size of one NALU fragment
+	H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+					  uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+
+	ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
+private:
+	const Separator separator;
 };
 
 } // namespace rtc

+ 1 - 0
include/rtc/include.hpp

@@ -56,6 +56,7 @@ using std::byte;
 using std::string;
 using std::string_view;
 using binary = std::vector<byte>;
+using binary_ptr = std::shared_ptr<binary>;
 
 using std::nullopt;
 

+ 55 - 0
include/rtc/mediachainablehandler.hpp

@@ -0,0 +1,55 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_MEDIA_CHAINABLE_HANDLER_H
+#define RTC_MEDIA_CHAINABLE_HANDLER_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediahandler.hpp"
+#include "mediahandlerrootelement.hpp"
+
+namespace rtc {
+
+class RTC_CPP_EXPORT MediaChainableHandler : public MediaHandler {
+	const std::shared_ptr<MediaHandlerRootElement> root;
+	std::shared_ptr<MediaHandlerElement> leaf;
+	std::mutex inoutMutex;
+
+	message_ptr handleIncomingBinary(message_ptr);
+	message_ptr handleIncomingControl(message_ptr);
+	message_ptr handleOutgoingBinary(message_ptr);
+	message_ptr handleOutgoingControl(message_ptr);
+	bool sendProduct(ChainedOutgoingProduct product);
+public:
+	MediaChainableHandler(std::shared_ptr<MediaHandlerRootElement> root);
+	~MediaChainableHandler();
+	message_ptr incoming(message_ptr ptr) override;
+	message_ptr outgoing(message_ptr ptr) override;
+
+	bool send(message_ptr msg);
+
+	/// Adds element to chain
+	/// @param chainable Chainable element
+    void addToChain(std::shared_ptr<MediaHandlerElement> chainable);
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTC_MEDIA_CHAINABLE_HANDLER_H

+ 4 - 4
include/rtc/rtcphandler.hpp → include/rtc/mediahandler.hpp

@@ -17,15 +17,15 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
-#ifndef RTC_RTCP_HANDLER_H
-#define RTC_RTCP_HANDLER_H
+#ifndef RTC_MEDIA_HANDLER_H
+#define RTC_MEDIA_HANDLER_H
 
 #include "include.hpp"
 #include "message.hpp"
 
 namespace rtc {
 
-class RTC_CPP_EXPORT RtcpHandler {
+class RTC_CPP_EXPORT MediaHandler {
 protected:
 	// Use this callback when trying to send custom data (such as RTCP) to the client.
 	synchronized_callback<message_ptr> outgoingCallback;
@@ -47,4 +47,4 @@ public:
 
 } // namespace rtc
 
-#endif // RTC_RTCP_HANDLER_H
+#endif // RTC_MEDIA_HANDLER_H

+ 111 - 0
include/rtc/mediahandlerelement.hpp

@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_MEDIA_HANDLER_ELEMENT_H
+#define RTC_MEDIA_HANDLER_ELEMENT_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "include.hpp"
+#include "message.hpp"
+#include "rtp.hpp"
+
+namespace rtc {
+
+using ChainedMessagesProduct = std::shared_ptr<std::vector<binary_ptr>>;
+
+RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product();
+RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product(message_ptr msg);
+
+/// Ougoing messages
+struct RTC_CPP_EXPORT ChainedOutgoingProduct {
+	ChainedOutgoingProduct(ChainedMessagesProduct messages = nullptr, message_ptr control = nullptr);
+	const ChainedMessagesProduct messages;
+	const message_ptr control;
+};
+
+/// Incoming messages with response
+struct RTC_CPP_EXPORT ChainedIncomingProduct {
+	ChainedIncomingProduct(ChainedMessagesProduct incoming = nullptr, ChainedMessagesProduct outgoing = nullptr);
+	const ChainedMessagesProduct incoming;
+	const ChainedOutgoingProduct outgoing;
+};
+
+/// Incoming control messages with response
+struct RTC_CPP_EXPORT ChainedIncomingControlProduct {
+	ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedOutgoingProduct> outgoing = nullopt);
+	const message_ptr incoming;
+	const std::optional<ChainedOutgoingProduct> outgoing;
+};
+
+/// Chainable handler
+class RTC_CPP_EXPORT MediaHandlerElement: public std::enable_shared_from_this<MediaHandlerElement> {
+	std::shared_ptr<MediaHandlerElement> upstream = nullptr;
+	std::shared_ptr<MediaHandlerElement> downstream = nullptr;
+
+	void prepareAndSendResponse(std::optional<ChainedOutgoingProduct> outgoing, std::function<bool (ChainedOutgoingProduct)> send);
+
+	void removeFromChain();
+public:
+	MediaHandlerElement();
+
+	/// Creates response to incoming message
+	/// @param messages Current repsonse
+	/// @returns New response
+	std::optional<ChainedOutgoingProduct> processOutgoingResponse(ChainedOutgoingProduct messages);
+
+	// Process incoming and ougoing messages
+	message_ptr formIncomingControlMessage(message_ptr message, std::function<bool (ChainedOutgoingProduct)> send);
+	ChainedMessagesProduct formIncomingBinaryMessage(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingProduct)> send);
+	message_ptr formOutgoingControlMessage(message_ptr message);
+	std::optional<ChainedOutgoingProduct> formOutgoingBinaryMessage(ChainedOutgoingProduct product);
+
+	/// Process current control message
+	/// @param messages current message
+	/// @returns Modified message and response
+	virtual ChainedIncomingControlProduct processIncomingControlMessage(message_ptr messages);
+
+	/// Process current control message
+	/// @param messages current message
+	/// @returns Modified message
+	virtual message_ptr processOutgoingControlMessage(message_ptr messages);
+
+	/// Process current binary message
+	/// @param messages current message
+	/// @returns Modified message and response
+	virtual ChainedIncomingProduct processIncomingBinaryMessage(ChainedMessagesProduct messages);
+
+	/// Process current binary message
+	/// @param messages current message
+	/// @param control current control message
+	/// @returns Modified binary message and control message
+	virtual ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control);
+
+	/// Set given element as upstream to this
+	/// @param upstream Upstream element
+	/// @returns Upstream element
+	std::shared_ptr<MediaHandlerElement> chainWith(std::shared_ptr<MediaHandlerElement> upstream);
+
+	/// Remove all downstream elements from chain
+	void recursiveRemoveChain();
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTC_MEDIA_HANDLER_ELEMENT_H

+ 45 - 0
include/rtc/mediahandlerrootelement.hpp

@@ -0,0 +1,45 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H
+#define RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediahandlerelement.hpp"
+
+namespace rtc {
+
+/// Chainable message handler
+class RTC_CPP_EXPORT MediaHandlerRootElement : public MediaHandlerElement {
+public:
+	MediaHandlerRootElement() { }
+
+	/// Reduce multiple messages into one message
+	/// @param messages Messages to reduce
+	virtual message_ptr reduce(ChainedMessagesProduct messages);
+
+	/// Splits message into multiple messages
+	/// @param message Message to split
+	virtual ChainedMessagesProduct split(message_ptr message);
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H

+ 4 - 5
include/rtc/nalunit.hpp

@@ -101,9 +101,9 @@ struct RTC_CPP_EXPORT NalUnitFragmentA : NalUnit {
 	enum class FragmentType { Start, Middle, End };
 
 	NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri, uint8_t unitType,
-	                 binary data);
+					 binary data);
 
-	static std::vector<NalUnitFragmentA> fragmentsFrom(NalUnit nalu, uint16_t maximumFragmentSize);
+	static std::vector<std::shared_ptr<NalUnitFragmentA>> fragmentsFrom(std::shared_ptr<NalUnit> nalu, uint16_t maximumFragmentSize);
 
 	uint8_t unitType() { return fragmentHeader()->unitType(); }
 
@@ -142,11 +142,10 @@ protected:
 	const uint8_t nal_type_fu_A = 28;
 };
 
-class RTC_CPP_EXPORT NalUnits : public std::vector<NalUnit> {
+class RTC_CPP_EXPORT NalUnits : public std::vector<std::shared_ptr<NalUnit>> {
 public:
 	static const uint16_t defaultMaximumFragmentSize = 1400;
-	std::vector<binary>
-	generateFragments(uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+	std::vector<std::shared_ptr<binary>> generateFragments(uint16_t maximumFragmentSize);
 };
 
 } // namespace rtc

+ 2 - 12
include/rtc/opuspacketizationhandler.hpp

@@ -22,27 +22,17 @@
 #if RTC_ENABLE_MEDIA
 
 #include "opusrtppacketizer.hpp"
-#include "rtcphandler.hpp"
-#include "rtcpsenderreporter.hpp"
+#include "mediachainablehandler.hpp"
 
 namespace rtc {
 
 /// Handler for opus packetization
-class RTC_CPP_EXPORT OpusPacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
-	/// RTP packetizer for opus
-	const std::shared_ptr<OpusRtpPacketizer> packetizer;
+class RTC_CPP_EXPORT OpusPacketizationHandler : public MediaChainableHandler {
 
 public:
 	/// Construct handler for opus packetization.
 	/// @param packetizer RTP packetizer for opus
 	OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer);
-
-	/// Returns message unchanged
-	/// @param ptr message
-	message_ptr incoming(message_ptr ptr) override;
-	/// Returns packetized message if message type is binary
-	/// @param ptr message
-	message_ptr outgoing(message_ptr ptr) override;
 };
 
 } // namespace rtc

+ 9 - 3
include/rtc/opusrtppacketizer.hpp

@@ -22,12 +22,12 @@
 #if RTC_ENABLE_MEDIA
 
 #include "rtppacketizer.hpp"
+#include "mediahandlerrootelement.hpp"
 
 namespace rtc {
 
 /// RTP packetizer for opus
-class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer {
-
+class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer, public MediaHandlerRootElement {
 public:
 	/// default clock rate used in opus RTP communication
 	static const uint32_t defaultClockRate = 48 * 1000;
@@ -42,7 +42,13 @@ public:
 	/// @note This function increase sequence number after packetization.
 	/// @param payload RTP payload
 	/// @param setMark This needs to be `false` for all RTP packets with opus payload
-	message_ptr packetize(binary payload, bool setMark) override;
+	binary_ptr packetize(binary_ptr payload, bool setMark) override;
+
+	/// Creates RTP packet for given samples (all samples share same RTP timesamp)
+	/// @param messages opus samples
+	/// @param control RTCP
+	/// @returns RTP packets and unchanged `control`
+	ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
 };
 
 } // namespace rtc

+ 12 - 3
include/rtc/rtc.h

@@ -41,6 +41,7 @@ extern "C" {
 
 #if RTC_ENABLE_MEDIA
 #define RTC_DEFAULT_MAXIMUM_FRAGMENT_SIZE ((uint16_t)1400)
+#define RTC_DEFAULT_MAXIMUM_PACKET_COUNT_FOR_NACK_CACHE ((unsigned)512)
 #endif
 
 #include <stdbool.h>
@@ -240,6 +241,15 @@ RTC_EXPORT int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char
 /// @param _timestamp Timestamp
 RTC_EXPORT int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char * cname, uint8_t payloadType, uint32_t clockRate, uint16_t _sequenceNumber, uint32_t _timestamp);
 
+/// Chain RtcpSrReporter to handler chain for given track
+/// @param tr Track id
+int rtcChainRtcpSrReporter(int tr);
+
+/// Chain RtcpNackResponder to handler chain for given track
+/// @param tr Track id
+/// @param maxStoredPacketsCount Maximum stored packet count
+int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount);
+
 /// Set start time for RTP stream
 /// @param startTime_s Start time in seconds
 /// @param timeIntervalSince1970 Set true if `startTime_s` is time interval since 1970, false if `startTime_s` is time interval since 1900
@@ -250,7 +260,6 @@ int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeInterva
 /// @param id Track identifier
 int rtcStartRtcpSenderReporterRecording(int id);
 
-
 /// Transform seconds to timestamp using track's clock rate
 /// @param id Track id
 /// @param seconds Seconds
@@ -283,9 +292,9 @@ int rtcSetTrackRTPTimestamp(int id, uint32_t timestamp);
 /// @param timestamp Pointer for result
 int rtcGetPreviousTrackSenderReportTimestamp(int id, uint32_t * timestamp);
 
-/// Set `NeedsToReport` flag in RtcpSenderReporter handler identified by given track id
+/// Set `NeedsToReport` flag in RtcpSrReporter handler identified by given track id
 /// @param id Track id
-int rtcSetNeedsToSendRTCPSR(int id);
+int rtcSetNeedsToSendRtcpSr(int id);
 
 #endif // RTC_ENABLE_MEDIA
 

+ 4 - 1
include/rtc/rtc.hpp

@@ -27,8 +27,11 @@
 
 #if RTC_ENABLE_MEDIA
 
-// RTCP handling
+// Media handling
 #include "rtcpreceivingsession.hpp"
+#include "mediachainablehandler.hpp"
+#include "rtcpsrreporter.hpp"
+#include "rtcpnackresponder.hpp"
 
 // Opus/h264 streaming
 #include "h264packetizationhandler.hpp"

+ 94 - 0
include/rtc/rtcpnackresponder.hpp

@@ -0,0 +1,94 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_RTCP_NACK_RESPONDER_H
+#define RTC_RTCP_NACK_RESPONDER_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediahandlerelement.hpp"
+
+#include <unordered_map>
+#include <queue>
+
+namespace rtc {
+
+class RTC_CPP_EXPORT RtcpNackResponder: public MediaHandlerElement {
+
+	/// Packet storage
+	class RTC_CPP_EXPORT Storage {
+		
+		/// Packet storage element
+		struct RTC_CPP_EXPORT Element {
+			Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next = nullptr);
+			const binary_ptr packet;
+			const uint16_t sequenceNumber;
+			/// Pointer to newer element
+			std::shared_ptr<Element> next = nullptr;
+		};
+
+	private:
+		/// Oldest packet in storage
+		std::shared_ptr<Element> oldest = nullptr;
+		/// Newest packet in storage
+		std::shared_ptr<Element> newest = nullptr;
+
+		/// Inner storage
+		std::unordered_map<uint16_t, std::shared_ptr<Element>> storage{};
+
+		/// Maximum storage size
+		const unsigned maximumSize;
+
+		/// Returnst current size
+		unsigned size();
+
+	public:
+		static const unsigned defaultMaximumSize = 512;
+
+		Storage(unsigned _maximumSize);
+
+		/// Returns packet with given sequence number
+		std::optional<binary_ptr> get(uint16_t sequenceNumber);
+
+		/// Stores packet
+		/// @param packet Packet
+		void store(binary_ptr packet);
+	};
+
+	const std::shared_ptr<Storage> storage;
+	std::mutex reportMutex;
+
+public:
+	RtcpNackResponder(unsigned maxStoredPacketCount = Storage::defaultMaximumSize);
+
+	/// Checks for RTCP NACK and handles it,
+	/// @param message RTCP message
+	/// @returns unchanged RTCP message and requested RTP packets
+	ChainedIncomingControlProduct processIncomingControlMessage(message_ptr message) override;
+
+	/// Stores RTP packets in internal storage
+	/// @param messages RTP packets
+	/// @param control RTCP
+	/// @returns Unchanged RTP and RTCP
+	ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
+};
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */
+
+#endif /* RTC_RTCP_NACK_RESPONDER_H */

+ 2 - 2
include/rtc/rtcpreceivingsession.hpp

@@ -23,14 +23,14 @@
 #if RTC_ENABLE_MEDIA
 
 #include "include.hpp"
-#include "rtcphandler.hpp"
+#include "mediahandler.hpp"
 #include "message.hpp"
 #include "rtp.hpp"
 
 namespace rtc {
 
 // An RtcpSession can be plugged into a Track to handle the whole RTCP session
-class RTC_CPP_EXPORT RtcpReceivingSession : public RtcpHandler {
+class RTC_CPP_EXPORT RtcpReceivingSession : public MediaHandler {
 public:
 	message_ptr incoming(message_ptr ptr) override;
 	message_ptr outgoing(message_ptr ptr) override;

+ 7 - 30
include/rtc/rtcpsenderreporter.hpp → include/rtc/rtcpsrreporter.hpp

@@ -1,5 +1,4 @@
-/*
- * libdatachannel streamer example
+/**
  * Copyright (c) 2020 Filip Klembara (in2core)
  *
  * This program is free software; you can redistribute it and/or
@@ -23,11 +22,12 @@
 
 #include "message.hpp"
 #include "rtppacketizationconfig.hpp"
+#include "mediahandlerelement.hpp"
 
 namespace rtc {
 
-/// Class for sending RTCP SR
-class RTC_CPP_EXPORT RtcpSenderReporter {
+class RTC_CPP_EXPORT RtcpSrReporter: public MediaHandlerElement {
+
 	bool needsToReport = false;
 
 	uint32_t packetCount = 0;
@@ -39,10 +39,6 @@ class RTC_CPP_EXPORT RtcpSenderReporter {
 	void addToReport(RTP *rtp, uint32_t rtpSize);
 	message_ptr getSenderReport(uint32_t timestamp);
 
-protected:
-	/// Outgoing callback for sender reports
-	synchronized_callback<message_ptr> senderReportOutgoingCallback;
-
 public:
 	static uint64_t secondsToNTP(double seconds);
 
@@ -52,7 +48,9 @@ public:
 	/// RTP configuration
 	const std::shared_ptr<RtpPacketizationConfig> rtpConfig;
 
-	RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+	RtcpSrReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+
+	ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
 
 	/// Set `needsToReport` flag. Sender report will be sent before next RTP packet with same
 	/// timestamp.
@@ -63,27 +61,6 @@ public:
 	/// @note `time_offset = rtpConfig->startTime_s -
 	/// rtpConfig->timestampToSeconds(rtpConfig->timestamp)`
 	void startRecording();
-
-	/// Send RTCP SR with given timestamp
-	/// @param timestamp timestamp of the RTCP SR
-	void sendReport(uint32_t timestamp);
-
-protected:
-	/// Calls given block with function for statistics. Sends RTCP SR packet with current timestamp
-	/// before `block` call if `needs_to_report` flag is true.
-	/// @param block Block of code to run. This block has function for rtp stats recording.
-	template <typename T>
-	T withStatsRecording(std::function<T(std::function<void(message_ptr)>)> block) {
-		if (needsToReport) {
-			sendReport(rtpConfig->timestamp);
-			needsToReport = false;
-		}
-		auto result = block([this](message_ptr _rtp) {
-			auto rtp = reinterpret_cast<RTP *>(_rtp->data());
-			this->addToReport(rtp, _rtp->size());
-		});
-		return result;
-	}
 };
 
 } // namespace rtc

+ 29 - 6
include/rtc/rtp.hpp

@@ -617,8 +617,31 @@ struct RTCP_FIR {
 };
 
 struct RTCP_NACK_PART {
-	uint16_t pid;
-	uint16_t blp;
+	uint16_t _pid;
+	uint16_t _blp;
+
+	uint16_t getPID() { return ntohs(_pid); }
+	uint16_t getBLP() { return ntohs(_blp); }
+
+	void setPID(uint16_t pid) { _pid = htons(pid); }
+	void setBLP(uint16_t blp) { _blp = htons(blp); }
+
+	std::vector<uint16_t> getSequenceNumbers() {
+		std::vector<uint16_t> result{};
+		result.reserve(17);
+		auto pid = getPID();
+		result.push_back(pid);
+		auto bitmask = getBLP();
+		auto i = pid + 1;
+		while (bitmask > 0) {
+			if (bitmask & 0x1) {
+				result.push_back(i);
+			}
+			i += 1;
+			bitmask >>= 1;
+		}
+		return result;
+	}
 };
 
 class RTCP_NACK {
@@ -644,16 +667,16 @@ public:
 	 */
 	bool addMissingPacket(unsigned int *fciCount, uint16_t *fciPID, uint16_t missingPacket) {
 		if (*fciCount == 0 || missingPacket < *fciPID || missingPacket > (*fciPID + 16)) {
-			parts[*fciCount].pid = htons(missingPacket);
-			parts[*fciCount].blp = 0;
+			parts[*fciCount].setPID(missingPacket);
+			parts[*fciCount].setBLP(0);
 			*fciPID = missingPacket;
 			(*fciCount)++;
 			return true;
 		} else {
 			// TODO SPEEED!
-			auto blp = ntohs(parts[(*fciCount) - 1].blp);
+			auto blp = parts[(*fciCount) - 1].getBLP();
 			auto newBit = 1u << (unsigned int)(missingPacket - (1 + *fciPID));
-			parts[(*fciCount) - 1].blp = htons(blp | newBit);
+			parts[(*fciCount) - 1].setBLP(blp | newBit);
 			return false;
 		}
 	}

+ 1 - 1
include/rtc/rtppacketizer.hpp

@@ -44,7 +44,7 @@ public:
 	/// @note This function increase sequence number after packetization.
 	/// @param payload RTP payload
 	/// @param setMark Set marker flag in RTP packet if true
-	virtual message_ptr packetize(binary payload, bool setMark);
+	virtual std::shared_ptr<binary> packetize(std::shared_ptr<binary> payload, bool setMark);
 };
 
 } // namespace rtc

+ 4 - 4
include/rtc/track.hpp

@@ -24,7 +24,7 @@
 #include "include.hpp"
 #include "message.hpp"
 #include "queue.hpp"
-#include "rtcphandler.hpp"
+#include "mediahandler.hpp"
 
 #include <atomic>
 #include <variant>
@@ -62,8 +62,8 @@ public:
 	bool requestKeyframe();
 
 	// RTCP handler
-	void setRtcpHandler(std::shared_ptr<RtcpHandler> handler);
-	std::shared_ptr<RtcpHandler> getRtcpHandler();
+	void setRtcpHandler(std::shared_ptr<MediaHandler> handler);
+	std::shared_ptr<MediaHandler> getRtcpHandler();
 
 private:
 #if RTC_ENABLE_MEDIA
@@ -80,7 +80,7 @@ private:
 	Queue<message_ptr> mRecvQueue;
 
 	std::shared_mutex mRtcpHandlerMutex;
-	std::shared_ptr<RtcpHandler> mRtcpHandler;
+	std::shared_ptr<MediaHandler> mRtcpHandler;
 
 	friend class PeerConnection;
 };

+ 59 - 24
src/capi.cpp

@@ -53,7 +53,8 @@ std::unordered_map<int, shared_ptr<PeerConnection>> peerConnectionMap;
 std::unordered_map<int, shared_ptr<DataChannel>> dataChannelMap;
 std::unordered_map<int, shared_ptr<Track>> trackMap;
 #if RTC_ENABLE_MEDIA
-std::unordered_map<int, shared_ptr<RtcpSenderReporter>> rtcpSenderMap;
+std::unordered_map<int, shared_ptr<MediaChainableHandler>> rtcpChainableHandlerMap;
+std::unordered_map<int, shared_ptr<RtcpSrReporter>> rtcpSrReporterMap;
 std::unordered_map<int, shared_ptr<RtpPacketizationConfig>> rtpConfigMap;
 #endif
 #if RTC_ENABLE_WEBSOCKET
@@ -141,7 +142,8 @@ void eraseTrack(int tr) {
 	if (trackMap.erase(tr) == 0)
 		throw std::invalid_argument("Track ID does not exist");
 #if RTC_ENABLE_MEDIA
-	rtcpSenderMap.erase(tr);
+	rtcpSrReporterMap.erase(tr);
+	rtcpChainableHandlerMap.erase(tr);
 	rtpConfigMap.erase(tr);
 #endif
 	userPointerMap.erase(tr);
@@ -149,25 +151,41 @@ void eraseTrack(int tr) {
 
 #if RTC_ENABLE_MEDIA
 
-shared_ptr<RtcpSenderReporter> getRTCPSender(int id) {
+shared_ptr<RtcpSrReporter> getRtcpSrReporter(int id) {
 	std::lock_guard lock(mutex);
-	if (auto it = rtcpSenderMap.find(id); it != rtcpSenderMap.end())
+	if (auto it = rtcpSrReporterMap.find(id); it != rtcpSrReporterMap.end()) {
 		return it->second;
-	else
-		throw std::invalid_argument("RtcpSenderReporter ID does not exist");
+	} else {
+		throw std::invalid_argument("RtcpSRReporter ID does not exist");
+	}
+}
+
+void emplaceRtcpSrReporter(shared_ptr<RtcpSrReporter> ptr, int tr) {
+	std::lock_guard lock(mutex);
+	rtcpSrReporterMap.emplace(std::make_pair(tr, ptr));
+}
+
+shared_ptr<MediaChainableHandler> getMediaChainableHandler(int id) {
+	std::lock_guard lock(mutex);
+	if (auto it = rtcpChainableHandlerMap.find(id); it != rtcpChainableHandlerMap.end()) {
+		return it->second;
+	} else {
+		throw std::invalid_argument("RtcpChainableHandler ID does not exist");
+	}
 }
 
-void emplaceRTCPSender(shared_ptr<RtcpSenderReporter> ptr, int tr) {
+void emplaceMediaChainableHandler(shared_ptr<MediaChainableHandler> ptr, int tr) {
 	std::lock_guard lock(mutex);
-	rtcpSenderMap.emplace(std::make_pair(tr, ptr));
+	rtcpChainableHandlerMap.emplace(std::make_pair(tr, ptr));
 }
 
 shared_ptr<RtpPacketizationConfig> getRTPConfig(int id) {
 	std::lock_guard lock(mutex);
-	if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end())
+	if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end()) {
 		return it->second;
-	else
+	} else {
 		throw std::invalid_argument("RTPConfiguration ID does not exist");
+	}
 }
 
 void emplaceRTPConfig(shared_ptr<RtpPacketizationConfig> ptr, int tr) {
@@ -539,13 +557,12 @@ int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char *cname, uin
 		auto track = getTrack(tr);
 		// create RTP configuration
 		auto rtpConfig = getNewRtpPacketizationConfig(ssrc, cname, payloadType, clockRate,
-		                                              sequenceNumber, timestamp);
+													  sequenceNumber, timestamp);
 		// create packetizer
-		auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig));
-		// create H264 and RTCP SP handler
-		shared_ptr<H264PacketizationHandler> h264Handler(
-		    new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer, maxFragmentSize));
-		emplaceRTCPSender(h264Handler, tr);
+		auto packetizer = std::make_shared<H264RtpPacketizer>(rtpConfig, maxFragmentSize);
+		// create H264 handler
+		auto h264Handler = std::make_shared<H264PacketizationHandler>(packetizer);
+		emplaceMediaChainableHandler(h264Handler, tr);
 		emplaceRTPConfig(rtpConfig, tr);
 		// set handler
 		track->setRtcpHandler(h264Handler);
@@ -561,16 +578,34 @@ int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char *cname, uin
 		auto rtpConfig = getNewRtpPacketizationConfig(ssrc, cname, payloadType, clockRate,
 		                                              sequenceNumber, timestamp);
 		// create packetizer
-		auto packetizer = shared_ptr<OpusRtpPacketizer>(new OpusRtpPacketizer(rtpConfig));
-		// create Opus and RTCP SP handler
-		shared_ptr<OpusPacketizationHandler> opusHandler(new OpusPacketizationHandler(packetizer));
-		emplaceRTCPSender(opusHandler, tr);
+		auto packetizer = std::make_shared<OpusRtpPacketizer>(rtpConfig);
+		// create Opus handler
+		auto opusHandler = std::make_shared<OpusPacketizationHandler>(packetizer);
+        emplaceMediaChainableHandler(opusHandler, tr);
 		emplaceRTPConfig(rtpConfig, tr);
 		// set handler
 		track->setRtcpHandler(opusHandler);
 	});
 }
 
+int rtcChainRtcpSrReporter(int tr) {
+	return WRAP({
+		auto config = getRTPConfig(tr);
+		auto reporter = std::make_shared<RtcpSrReporter>(config);
+		emplaceRtcpSrReporter(reporter, tr);
+		auto chainableHandler = getMediaChainableHandler(tr);
+		chainableHandler->addToChain(reporter);
+	});
+}
+
+int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount) {
+	return WRAP({
+		auto responder = std::make_shared<RtcpNackResponder>(maxStoredPacketsCount);
+		auto chainableHandler = getMediaChainableHandler(tr);
+		chainableHandler->addToChain(responder);
+	});
+}
+
 int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeIntervalSince1970,
                                     uint32_t timestamp) {
 	return WRAP({
@@ -585,7 +620,7 @@ int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeInterva
 
 int rtcStartRtcpSenderReporterRecording(int id) {
 	return WRAP({
-		auto sender = getRTCPSender(id);
+		auto sender = getRtcpSrReporter(id);
 		sender->startRecording();
 	});
 }
@@ -627,14 +662,14 @@ int rtcSetTrackRTPTimestamp(int id, uint32_t timestamp) {
 
 int rtcGetPreviousTrackSenderReportTimestamp(int id, uint32_t *timestamp) {
 	return WRAP({
-		auto sender = getRTCPSender(id);
+		auto sender = getRtcpSrReporter(id);
 		*timestamp = sender->previousReportedTimestamp;
 	});
 }
 
-int rtcSetNeedsToSendRTCPSR(int id) {
+int rtcSetNeedsToSendRtcpSr(int id) {
 	return WRAP({
-		auto sender = getRTCPSender(id);
+		auto sender = getRtcpSrReporter(id);
 		sender->setNeedsToReport();
 	});
 }

+ 1 - 139
src/h264packetizationhandler.cpp

@@ -22,145 +22,7 @@
 
 namespace rtc {
 
-using std::function;
-using std::make_shared;
-using std::shared_ptr;
-
-typedef enum {
-	NUSM_noMatch,
-	NUSM_firstZero,
-	NUSM_secondZero,
-	NUSM_thirdZero,
-	NUSM_shortMatch,
-	NUSM_longMatch
-} NalUnitStartSequenceMatch;
-
-NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
-                                                 H264PacketizationHandler::Separator separator) {
-	assert(separator != H264PacketizationHandler::Separator::Length);
-	auto byte = (uint8_t)_byte;
-	auto detectShort = separator == H264PacketizationHandler::Separator::ShortStartSequence ||
-	                   separator == H264PacketizationHandler::Separator::StartSequence;
-	auto detectLong = separator == H264PacketizationHandler::Separator::LongStartSequence ||
-	                  separator == H264PacketizationHandler::Separator::StartSequence;
-	switch (match) {
-	case NUSM_noMatch:
-		if (byte == 0x00) {
-			return NUSM_firstZero;
-		}
-		break;
-	case NUSM_firstZero:
-		if (byte == 0x00) {
-			return NUSM_secondZero;
-		}
-		break;
-	case NUSM_secondZero:
-		if (byte == 0x00 && detectLong) {
-			return NUSM_thirdZero;
-		} else if (byte == 0x01 && detectShort) {
-			return NUSM_shortMatch;
-		}
-		break;
-	case NUSM_thirdZero:
-		if (byte == 0x01 && detectLong) {
-			return NUSM_longMatch;
-		}
-		break;
-	case NUSM_shortMatch:
-		return NUSM_shortMatch;
-	case NUSM_longMatch:
-		return NUSM_longMatch;
-	}
-	return NUSM_noMatch;
-}
-
-message_ptr H264PacketizationHandler::incoming(message_ptr ptr) { return ptr; }
-
-shared_ptr<NalUnits> H264PacketizationHandler::splitMessage(message_ptr message) {
-	auto nalus = make_shared<NalUnits>();
-	if (separator == Separator::Length) {
-		unsigned long long index = 0;
-		while (index < message->size()) {
-			assert(index + 4 < message->size());
-			if (index + 4 >= message->size()) {
-				LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
-				break;
-			}
-			auto lengthPtr = (uint32_t *)(message->data() + index);
-			uint32_t length = ntohl(*lengthPtr);
-			auto naluStartIndex = index + 4;
-			auto naluEndIndex = naluStartIndex + length;
-
-			assert(naluEndIndex <= message->size());
-			if (naluEndIndex > message->size()) {
-				LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
-				break;
-			}
-			nalus->push_back(
-			    NalUnit(message->begin() + naluStartIndex, message->begin() + naluEndIndex));
-			index = naluEndIndex;
-		}
-	} else {
-		NalUnitStartSequenceMatch match = NUSM_noMatch;
-		unsigned long long index = 0;
-		while (index < message->size()) {
-			match = StartSequenceMatchSucc(match, (*message)[index++], separator);
-			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
-				match = NUSM_noMatch;
-				break;
-			}
-		}
-		unsigned long long naluStartIndex = index;
-
-		while (index < message->size()) {
-			match = StartSequenceMatchSucc(match, (*message)[index], separator);
-			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
-				auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
-				unsigned long long naluEndIndex = index - sequenceLength;
-				match = NUSM_noMatch;
-				nalus->push_back(NalUnit(message->begin() + naluStartIndex,
-				                         message->begin() + naluEndIndex + 1));
-				naluStartIndex = index + 1;
-			}
-			index++;
-		}
-		nalus->push_back(NalUnit(message->begin() + naluStartIndex, message->end()));
-	}
-	return nalus;
-}
-
-message_ptr H264PacketizationHandler::outgoing(message_ptr ptr) {
-	if (ptr->type == Message::Binary) {
-		auto nalus = splitMessage(ptr);
-		auto fragments = nalus->generateFragments(maximumFragmentSize);
-
-		auto lastPacket = withStatsRecording<message_ptr>(
-		    [fragments, this](function<void(message_ptr)> addToReport) {
-			    for (unsigned long long index = 0; index < fragments.size() - 1; index++) {
-				    auto packet = packetizer->packetize(fragments[index], false);
-
-				    addToReport(packet);
-
-				    outgoingCallback(std::move(packet));
-			    }
-			    // packet is last, marker must be set
-			    auto lastPacket = packetizer->packetize(fragments[fragments.size() - 1], true);
-			    addToReport(lastPacket);
-			    return lastPacket;
-		    });
-		return lastPacket;
-	}
-	return ptr;
-}
-
-H264PacketizationHandler::H264PacketizationHandler(Separator separator,
-                                                   std::shared_ptr<H264RtpPacketizer> packetizer,
-                                                   uint16_t maximumFragmentSize)
-    : RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer),
-      maximumFragmentSize(maximumFragmentSize), separator(separator) {
-
-	senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
-}
+H264PacketizationHandler::H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer): MediaChainableHandler(packetizer) { }
 
 } // namespace rtc
 

+ 133 - 2
src/h264rtppacketizer.cpp

@@ -22,8 +22,139 @@
 
 namespace rtc {
 
-H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : RtpPacketizer(rtpConfig) {}
+using std::make_shared;
+using std::shared_ptr;
+
+typedef enum {
+	NUSM_noMatch,
+	NUSM_firstZero,
+	NUSM_secondZero,
+	NUSM_thirdZero,
+	NUSM_shortMatch,
+	NUSM_longMatch
+} NalUnitStartSequenceMatch;
+
+NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
+												 H264RtpPacketizer::Separator separator) {
+	assert(separator != H264RtpPacketizer::Separator::Length);
+	auto byte = (uint8_t)_byte;
+	auto detectShort = separator == H264RtpPacketizer::Separator::ShortStartSequence ||
+	separator == H264RtpPacketizer::Separator::StartSequence;
+	auto detectLong = separator == H264RtpPacketizer::Separator::LongStartSequence ||
+	separator == H264RtpPacketizer::Separator::StartSequence;
+	switch (match) {
+		case NUSM_noMatch:
+			if (byte == 0x00) {
+				return NUSM_firstZero;
+			}
+			break;
+		case NUSM_firstZero:
+			if (byte == 0x00) {
+				return NUSM_secondZero;
+			}
+			break;
+		case NUSM_secondZero:
+			if (byte == 0x00 && detectLong) {
+				return NUSM_thirdZero;
+			} else if (byte == 0x01 && detectShort) {
+				return NUSM_shortMatch;
+			}
+			break;
+		case NUSM_thirdZero:
+			if (byte == 0x01 && detectLong) {
+				return NUSM_longMatch;
+			}
+			break;
+		case NUSM_shortMatch:
+			return NUSM_shortMatch;
+		case NUSM_longMatch:
+			return NUSM_longMatch;
+	}
+	return NUSM_noMatch;
+}
+
+shared_ptr<NalUnits> H264RtpPacketizer::splitMessage(binary_ptr message) {
+	auto nalus = make_shared<NalUnits>();
+	if (separator == Separator::Length) {
+		unsigned long long index = 0;
+		while (index < message->size()) {
+			assert(index + 4 < message->size());
+			if (index + 4 >= message->size()) {
+				LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
+				break;
+			}
+			auto lengthPtr = (uint32_t *)(message->data() + index);
+			uint32_t length = ntohl(*lengthPtr);
+			auto naluStartIndex = index + 4;
+			auto naluEndIndex = naluStartIndex + length;
+
+			assert(naluEndIndex <= message->size());
+			if (naluEndIndex > message->size()) {
+				LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
+				break;
+			}
+			auto begin = message->begin() + naluStartIndex;
+			auto end = message->begin() + naluEndIndex;
+			nalus->push_back(std::make_shared<NalUnit>(begin, end));
+			index = naluEndIndex;
+		}
+	} else {
+		NalUnitStartSequenceMatch match = NUSM_noMatch;
+		unsigned long long index = 0;
+		while (index < message->size()) {
+			match = StartSequenceMatchSucc(match, (*message)[index++], separator);
+			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
+				match = NUSM_noMatch;
+				break;
+			}
+		}
+
+		unsigned long long naluStartIndex = index;
+
+		while (index < message->size()) {
+			match = StartSequenceMatchSucc(match, (*message)[index], separator);
+			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
+				auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
+				unsigned long long naluEndIndex = index - sequenceLength;
+				match = NUSM_noMatch;
+				auto begin = message->begin() + naluStartIndex;
+				auto end = message->begin() + naluEndIndex + 1;
+				nalus->push_back(std::make_shared<NalUnit>(begin, end));
+				naluStartIndex = index + 1;
+			}
+			index++;
+		}
+		auto begin = message->begin() + naluStartIndex;
+		auto end = message->end();
+		nalus->push_back(std::make_shared<NalUnit>(begin, end));
+	}
+	return nalus;
+}
+
+H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+									 uint16_t maximumFragmentSize)
+: RtpPacketizer(rtpConfig), MediaHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(Separator::Length) {}
+
+H264RtpPacketizer::H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+									 uint16_t maximumFragmentSize)
+: RtpPacketizer(rtpConfig), MediaHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(separator) {}
+
+ChainedOutgoingProduct H264RtpPacketizer::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
+	ChainedMessagesProduct packets = std::make_shared<std::vector<binary_ptr>>();
+	for (auto message: *messages) {
+		auto nalus = splitMessage(message);
+		auto fragments = nalus->generateFragments(maximumFragmentSize);
+		if (fragments.size() == 0) {
+			return ChainedOutgoingProduct();
+		}
+		unsigned i = 0;
+		for (; i < fragments.size() - 1; i++) {
+			packets->push_back(packetize(fragments[i], false));
+		}
+		packets->push_back(packetize(fragments[i], true));
+	}
+	return {packets, control};
+}
 
 } // namespace rtc
 

+ 167 - 0
src/mediachainablehandler.cpp

@@ -0,0 +1,167 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediachainablehandler.hpp"
+
+namespace rtc {
+
+MediaChainableHandler::MediaChainableHandler(std::shared_ptr<MediaHandlerRootElement> root): MediaHandler(), root(root), leaf(root) { }
+
+MediaChainableHandler::~MediaChainableHandler() {
+	leaf->recursiveRemoveChain();
+}
+
+bool MediaChainableHandler::sendProduct(ChainedOutgoingProduct product) {
+	bool result = true;
+	if (product.control) {
+		assert(product.control->type == Message::Control);
+		auto sendResult = send(product.control);
+		if(!sendResult) {
+			LOG_DEBUG << "Failed to send control message";
+		}
+		result = result && sendResult;
+	}
+	if (product.messages) {
+		auto messages = product.messages;
+		for (unsigned i = 0; i < messages->size(); i++) {
+			auto message = messages->at(i);
+			if (!message) {
+				LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << messages->size();
+			}
+			auto sendResult = send(make_message(*message));
+			if(!sendResult) {
+				LOG_DEBUG << "Failed to send message " << i + 1 << "/" << messages->size();
+			}
+			result = result && sendResult;
+		}
+	}
+	return result;
+}
+
+message_ptr MediaChainableHandler::handleIncomingBinary(message_ptr msg) {
+	assert(msg->type == Message::Binary);
+	auto messages = root->split(msg);
+	auto incoming = leaf->formIncomingBinaryMessage(messages, [this](ChainedOutgoingProduct outgoing) {
+		return sendProduct(outgoing);
+	});
+	if (incoming) {
+		return root->reduce(incoming);
+	} else {
+		return nullptr;
+	}
+}
+
+message_ptr MediaChainableHandler::handleIncomingControl(message_ptr msg) {
+	assert(msg->type == Message::Control);
+	auto incoming = leaf->formIncomingControlMessage(msg, [this](ChainedOutgoingProduct outgoing) {
+		return sendProduct(outgoing);
+	});
+	assert(!incoming || incoming->type == Message::Control);
+	return incoming;
+}
+
+message_ptr MediaChainableHandler::handleOutgoingBinary(message_ptr msg) {
+	assert(msg->type == Message::Binary);
+	auto messages = make_chained_messages_product(msg);
+	auto optOutgoing = root->formOutgoingBinaryMessage(ChainedOutgoingProduct(messages));
+	if (!optOutgoing.has_value()) {
+		LOG_ERROR << "Generating outgoing message failed";
+		return nullptr;
+	}
+	auto outgoing = optOutgoing.value();
+	if (outgoing.control) {
+		if(!send(outgoing.control)) {
+			LOG_DEBUG << "Failed to send control message";
+		}
+	}
+	auto lastMessage = outgoing.messages->back();
+	if (!lastMessage) {
+		LOG_DEBUG << "Invalid message to send";
+		return nullptr;
+	}
+	for (unsigned i = 0; i < outgoing.messages->size() - 1; i++) {
+		auto message = outgoing.messages->at(i);
+		if (!message) {
+			LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << outgoing.messages->size();
+		}
+		if(!send(make_message(*message))) {
+			LOG_DEBUG << "Failed to send message " << i + 1 << "/" << outgoing.messages->size();
+		}
+	}
+	return make_message(*lastMessage);
+}
+
+message_ptr MediaChainableHandler::handleOutgoingControl(message_ptr msg) {
+	assert(msg->type == Message::Control);
+	auto outgoing = root->formOutgoingControlMessage(msg);
+	assert(!outgoing || outgoing->type == Message::Control);
+	if (!outgoing) {
+		LOG_ERROR << "Generating outgoing control message failed";
+		return nullptr;
+	}
+	return outgoing;
+}
+
+message_ptr MediaChainableHandler::outgoing(message_ptr ptr) {
+	assert(ptr);
+	if (!ptr) {
+		LOG_ERROR << "Outgoing message is nullptr, ignoring";
+		return nullptr;
+	}
+	std::lock_guard<std::mutex> guard(inoutMutex);
+	if (ptr->type == Message::Binary) {
+		return handleOutgoingBinary(ptr);
+	} else if (ptr->type == Message::Control) {
+		return handleOutgoingControl(ptr);
+	}
+	return ptr;
+}
+
+message_ptr MediaChainableHandler::incoming(message_ptr ptr) {
+	if (!ptr) {
+		LOG_ERROR << "Incoming message is nullptr, ignoring";
+		return nullptr;
+	}
+	std::lock_guard<std::mutex> guard(inoutMutex);
+	if (ptr->type == Message::Binary) {
+		return handleIncomingBinary(ptr);
+	} else if (ptr->type == Message::Control) {
+		return handleIncomingControl(ptr);
+	}
+	return ptr;
+}
+
+bool MediaChainableHandler::send(message_ptr msg) {
+	try {
+		outgoingCallback(std::move(msg));
+		return true;
+	} catch (const std::exception &e) {
+		LOG_DEBUG << "Send in RTCP chain handler failed: " << e.what();
+	}
+	return false;
+}
+
+void MediaChainableHandler::addToChain(std::shared_ptr<MediaHandlerElement> chainable) {
+	assert(leaf);
+	leaf = leaf->chainWith(chainable);
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 201 - 0
src/mediahandlerelement.cpp

@@ -0,0 +1,201 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediahandlerelement.hpp"
+
+namespace rtc {
+
+ChainedMessagesProduct make_chained_messages_product() {
+	return std::make_shared<std::vector<binary_ptr>>();
+}
+
+ChainedMessagesProduct make_chained_messages_product(message_ptr msg) {
+	std::vector<binary_ptr> msgs = {msg};
+	return std::make_shared<std::vector<binary_ptr>>(msgs);
+}
+
+ChainedOutgoingProduct::ChainedOutgoingProduct(ChainedMessagesProduct messages, message_ptr control)
+: messages(messages), control(control) { }
+
+ChainedIncomingProduct::ChainedIncomingProduct(ChainedMessagesProduct incoming, ChainedMessagesProduct outgoing)
+: incoming(incoming), outgoing(outgoing) { }
+
+ChainedIncomingControlProduct::ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedOutgoingProduct> outgoing)
+: incoming(incoming), outgoing(outgoing) { }
+
+MediaHandlerElement::MediaHandlerElement() { }
+
+void MediaHandlerElement::removeFromChain() {
+	if (upstream) {
+		upstream->downstream = downstream;
+	}
+	if (downstream) {
+		downstream->upstream = upstream;
+	}
+	upstream = nullptr;
+	downstream = nullptr;
+}
+
+void MediaHandlerElement::recursiveRemoveChain() {
+	if (downstream) {
+		// `recursiveRemoveChain` removes last strong reference to downstream element
+		// we need to keep strong reference to prevent deallocation of downstream element
+		// during `recursiveRemoveChain`
+		auto strongDownstreamPtr = downstream;
+		downstream->recursiveRemoveChain();
+	}
+	removeFromChain();
+}
+
+std::optional<ChainedOutgoingProduct> MediaHandlerElement::processOutgoingResponse(ChainedOutgoingProduct messages) {
+	if (messages.messages) {
+		if (upstream) {
+			auto msgs = upstream->formOutgoingBinaryMessage(ChainedOutgoingProduct(messages.messages, messages.control));
+			if (msgs.has_value()) {
+				return msgs.value();
+			} else {
+				LOG_ERROR << "Generating outgoing message failed";
+				return nullopt;
+			}
+		} else {
+			return messages;
+		}
+	} else if (messages.control) {
+		if (upstream) {
+			auto control = upstream->formOutgoingControlMessage(messages.control);
+			if (control) {
+				return ChainedOutgoingProduct(nullptr, control);
+			} else {
+				LOG_ERROR << "Generating outgoing control message failed";
+				return nullopt;
+			}
+		} else {
+			return messages;
+		}
+	} else {
+		return ChainedOutgoingProduct();
+	}
+}
+
+void MediaHandlerElement::prepareAndSendResponse(std::optional<ChainedOutgoingProduct> outgoing, std::function<bool (ChainedOutgoingProduct)> send) {
+	if (outgoing.has_value()) {
+		auto message = outgoing.value();
+		auto response = processOutgoingResponse(message);
+		if (response.has_value()) {
+			if(!send(response.value())) {
+				LOG_DEBUG << "Send failed";
+			}
+		} else {
+			LOG_DEBUG << "No response to send";
+		}
+	}
+}
+
+message_ptr MediaHandlerElement::formIncomingControlMessage(message_ptr message, std::function<bool (ChainedOutgoingProduct)> send) {
+	assert(message);
+	auto product = processIncomingControlMessage(message);
+	prepareAndSendResponse(product.outgoing, send);
+	if (product.incoming) {
+		if (downstream) {
+			return downstream->formIncomingControlMessage(product.incoming, send);
+		} else {
+			return product.incoming;
+		}
+	} else {
+		return nullptr;
+	}
+}
+
+ChainedMessagesProduct MediaHandlerElement::formIncomingBinaryMessage(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingProduct)> send) {
+	assert(messages && !messages->empty());
+	auto product = processIncomingBinaryMessage(messages);
+	prepareAndSendResponse(product.outgoing, send);
+	if (product.incoming) {
+		if (downstream) {
+			return downstream->formIncomingBinaryMessage(product.incoming, send);
+		} else {
+			return product.incoming;
+		}
+	} else {
+		return nullptr;
+	}
+}
+
+message_ptr MediaHandlerElement::formOutgoingControlMessage(message_ptr message) {
+	assert(message);
+	auto newMessage = processOutgoingControlMessage(message);
+	assert(newMessage);
+	if(!newMessage) {
+		LOG_ERROR << "Failed to generate outgoing message";
+		return nullptr;
+	}
+	if (upstream) {
+		return upstream->formOutgoingControlMessage(newMessage);
+	} else {
+		return newMessage;
+	}
+}
+
+std::optional<ChainedOutgoingProduct> MediaHandlerElement::formOutgoingBinaryMessage(ChainedOutgoingProduct product) {
+	assert(product.messages && !product.messages->empty());
+	auto newProduct = processOutgoingBinaryMessage(product.messages, product.control);
+	assert(!product.control || newProduct.control);
+	assert(newProduct.messages && !newProduct.messages->empty());
+	if (product.control && !newProduct.control) {
+		LOG_ERROR << "Outgoing message must not remove control message";
+		return nullopt;
+	}
+	if (!newProduct.messages || newProduct.messages->empty()) {
+		LOG_ERROR << "Failed to generate message";
+		return nullopt;
+	}
+	if (upstream) {
+		return upstream->formOutgoingBinaryMessage(newProduct);
+	} else {
+		return newProduct;
+	}
+}
+
+ChainedIncomingControlProduct MediaHandlerElement::processIncomingControlMessage(message_ptr messages) {
+	return {messages};
+}
+
+message_ptr MediaHandlerElement::processOutgoingControlMessage(message_ptr messages) {
+	return messages;
+}
+
+ChainedIncomingProduct MediaHandlerElement::processIncomingBinaryMessage(ChainedMessagesProduct messages) {
+	return {messages};
+}
+
+ChainedOutgoingProduct MediaHandlerElement::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
+	return {messages, control};
+}
+
+std::shared_ptr<MediaHandlerElement> MediaHandlerElement::chainWith(std::shared_ptr<MediaHandlerElement> upstream) {
+	assert(this->upstream == nullptr);
+	assert(upstream->downstream == nullptr);
+	this->upstream = upstream;
+	upstream->downstream = shared_from_this();
+	return upstream;
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 43 - 0
src/mediahandlerrootelement.cpp

@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "mediahandlerrootelement.hpp"
+
+namespace rtc {
+
+message_ptr MediaHandlerRootElement::reduce(ChainedMessagesProduct messages) {
+	if (messages && !messages->empty()) {
+		auto msg_ptr = messages->front();
+		if (msg_ptr) {
+			return make_message(*msg_ptr);
+		} else {
+			return nullptr;
+		}
+	} else {
+		return nullptr;
+	}
+}
+
+ChainedMessagesProduct MediaHandlerRootElement::split(message_ptr message) {
+	return make_chained_messages_product(message);
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 31 - 31
src/nalunit.cpp

@@ -24,8 +24,8 @@
 namespace rtc {
 
 NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri,
-                                   uint8_t unitType, binary data)
-    : NalUnit(data.size() + 2) {
+								   uint8_t unitType, binary data)
+: NalUnit(data.size() + 2) {
 	setForbiddenBit(forbiddenBit);
 	setNRI(nri);
 	fragmentIndicator()->setUnitType(NalUnitFragmentA::nal_type_fu_A);
@@ -34,23 +34,23 @@ NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t
 	copy(data.begin(), data.end(), begin() + 2);
 }
 
-std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
-                                                              uint16_t maximumFragmentSize) {
-	assert(nalu.size() > maximumFragmentSize);
-	if (nalu.size() <= maximumFragmentSize) {
+std::vector<std::shared_ptr<NalUnitFragmentA>> NalUnitFragmentA::fragmentsFrom(std::shared_ptr<NalUnit> nalu,
+																			   uint16_t maximumFragmentSize) {
+	assert(nalu->size() > maximumFragmentSize);
+	if (nalu->size() <= maximumFragmentSize) {
 		// we need to change `maximum_fragment_size` to have at least two fragments
-		maximumFragmentSize = nalu.size() / 2;
+		maximumFragmentSize = nalu->size() / 2;
 	}
-	auto fragments_count = ceil(double(nalu.size()) / maximumFragmentSize);
-	maximumFragmentSize = ceil(nalu.size() / fragments_count);
+	auto fragments_count = ceil(double(nalu->size()) / maximumFragmentSize);
+	maximumFragmentSize = ceil(nalu->size() / fragments_count);
 
 	// 2 bytes for FU indicator and FU header
 	maximumFragmentSize -= 2;
-	auto f = nalu.forbiddenBit();
-	uint8_t nri = nalu.nri() & 0x03;
-	uint8_t naluType = nalu.unitType() & 0x1F;
-	auto payload = nalu.payload();
-	vector<NalUnitFragmentA> result{};
+	auto f = nalu->forbiddenBit();
+	uint8_t nri = nalu->nri() & 0x03;
+	uint8_t naluType = nalu->unitType() & 0x1F;
+	auto payload = nalu->payload();
+	vector<std::shared_ptr<NalUnitFragmentA>> result{};
 	uint64_t offset = 0;
 	while (offset < payload.size()) {
 		vector<byte> fragmentData;
@@ -66,7 +66,7 @@ std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
 			fragmentType = FragmentType::End;
 		}
 		fragmentData = {payload.begin() + offset, payload.begin() + offset + maximumFragmentSize};
-		NalUnitFragmentA fragment{fragmentType, f, nri, naluType, fragmentData};
+		auto fragment = std::make_shared<NalUnitFragmentA>(fragmentType, f, nri, naluType, fragmentData);
 		result.push_back(fragment);
 		offset += maximumFragmentSize;
 	}
@@ -76,26 +76,26 @@ std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
 void NalUnitFragmentA::setFragmentType(FragmentType type) {
 	fragmentHeader()->setReservedBit6(false);
 	switch (type) {
-	case FragmentType::Start:
-		fragmentHeader()->setStart(true);
-		fragmentHeader()->setEnd(false);
-		break;
-	case FragmentType::End:
-		fragmentHeader()->setStart(false);
-		fragmentHeader()->setEnd(true);
-		break;
-	default:
-		fragmentHeader()->setStart(false);
-		fragmentHeader()->setEnd(false);
+		case FragmentType::Start:
+			fragmentHeader()->setStart(true);
+			fragmentHeader()->setEnd(false);
+			break;
+		case FragmentType::End:
+			fragmentHeader()->setStart(false);
+			fragmentHeader()->setEnd(true);
+			break;
+		default:
+			fragmentHeader()->setStart(false);
+			fragmentHeader()->setEnd(false);
 	}
 }
 
-std::vector<binary> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
-	vector<binary> result{};
+std::vector<std::shared_ptr<binary>> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
+	vector<std::shared_ptr<binary>> result{};
 	for (auto nalu : *this) {
-		if (nalu.size() > maximumFragmentSize) {
-			std::vector<NalUnitFragmentA> fragments =
-			    NalUnitFragmentA::fragmentsFrom(nalu, maximumFragmentSize);
+		if (nalu->size() > maximumFragmentSize) {
+			std::vector<std::shared_ptr<NalUnitFragmentA>> fragments =
+			NalUnitFragmentA::fragmentsFrom(nalu, maximumFragmentSize);
 			result.insert(result.end(), fragments.begin(), fragments.end());
 		} else {
 			result.push_back(nalu);

+ 1 - 17
src/opuspacketizationhandler.cpp

@@ -23,23 +23,7 @@
 namespace rtc {
 
 OpusPacketizationHandler::OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer)
-    : RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer) {
-	senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
-}
-
-message_ptr OpusPacketizationHandler::incoming(message_ptr ptr) { return ptr; }
-
-message_ptr OpusPacketizationHandler::outgoing(message_ptr ptr) {
-	if (ptr->type == Message::Binary) {
-		return withStatsRecording<message_ptr>(
-		    [this, ptr](std::function<void(message_ptr)> addToReport) {
-			    auto rtp = packetizer->packetize(*ptr, false);
-			    addToReport(rtp);
-			    return rtp;
-		    });
-	}
-	return ptr;
-}
+: MediaChainableHandler(packetizer) { }
 
 } // namespace rtc
 

+ 11 - 2
src/opusrtppacketizer.cpp

@@ -23,13 +23,22 @@
 namespace rtc {
 
 OpusRtpPacketizer::OpusRtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : RtpPacketizer(rtpConfig) {}
+: RtpPacketizer(rtpConfig), MediaHandlerRootElement() {}
 
-message_ptr OpusRtpPacketizer::packetize(binary payload, bool setMark) {
+binary_ptr OpusRtpPacketizer::packetize(binary_ptr payload, bool setMark) {
 	assert(!setMark);
 	return RtpPacketizer::packetize(payload, false);
 }
 
+ChainedOutgoingProduct OpusRtpPacketizer::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
+	ChainedMessagesProduct packets = make_chained_messages_product();
+	packets->reserve(messages->size());
+	for (auto message: *messages) {
+		packets->push_back(packetize(message, false));
+	}
+	return {packets, control};
+}
+
 } // namespace rtc
 
 #endif /* RTC_ENABLE_MEDIA */

+ 118 - 0
src/rtcpnackresponder.cpp

@@ -0,0 +1,118 @@
+/**
+ * libdatachannel streamer example
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "rtcpnackresponder.hpp"
+
+namespace rtc {
+
+RtcpNackResponder::Storage::Element::Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next)
+: packet(packet), sequenceNumber(sequenceNumber), next(next) { }
+
+unsigned RtcpNackResponder::Storage::size() { return storage.size(); }
+
+RtcpNackResponder::Storage::Storage(unsigned _maximumSize): maximumSize(_maximumSize) {
+	assert(maximumSize > 0);
+	storage.reserve(maximumSize);
+}
+
+std::optional<binary_ptr> RtcpNackResponder::Storage::get(uint16_t sequenceNumber) {
+	auto position = storage.find(sequenceNumber);
+	return position != storage.end() ? std::make_optional(storage.at(sequenceNumber)->packet) : nullopt;
+}
+
+void RtcpNackResponder::Storage::store(binary_ptr packet) {
+	if (!packet || packet->size() < 12) {
+		return;
+	}
+	auto rtp = reinterpret_cast<RTP *>(packet->data());
+	auto sequenceNumber = rtp->seqNumber();
+
+	assert((storage.empty() && !oldest && !newest) || (!storage.empty() && oldest && newest));
+
+	if (size() == 0) {
+		newest = std::make_shared<Element>(packet, sequenceNumber);
+		oldest = newest;
+	} else {
+		auto current = std::make_shared<Element>(packet, sequenceNumber);
+		newest->next = current;
+		newest = current;
+	}
+
+	storage.emplace(sequenceNumber, newest);
+
+	if (size() > maximumSize) {
+		assert(oldest);
+		if (oldest) {
+			storage.erase(oldest->sequenceNumber);
+			oldest = oldest->next;
+		}
+	}
+}
+
+RtcpNackResponder::RtcpNackResponder(unsigned maxStoredPacketCount)
+: MediaHandlerElement(), storage(std::make_shared<Storage>(maxStoredPacketCount)) { }
+
+ChainedIncomingControlProduct RtcpNackResponder::processIncomingControlMessage(message_ptr message) {
+	std::optional<ChainedOutgoingProduct> optPackets = ChainedOutgoingProduct(nullptr);
+	auto packets = make_chained_messages_product();
+
+	unsigned int i = 0;
+	while (i < message->size()) {
+		auto nack = reinterpret_cast<RTCP_NACK *>(message->data() + i);
+		i += nack->header.header.lengthInBytes();
+		// check if rtcp is nack
+		if (nack->header.header.payloadType() != 205 || nack->header.header.reportCount() != 1) {
+			continue;
+		}
+		
+		auto fieldsCount = nack->getSeqNoCount();
+
+		std::vector<uint16_t> missingSequenceNumbers{};
+		for(unsigned int i = 0; i < fieldsCount; i++) {
+			auto field = nack->parts[i];
+			auto newMissingSeqenceNumbers = field.getSequenceNumbers();
+			missingSequenceNumbers.insert(missingSequenceNumbers.end(), newMissingSeqenceNumbers.begin(), newMissingSeqenceNumbers.end());
+		}
+		packets->reserve(packets->size() + missingSequenceNumbers.size());
+		for (auto sequenceNumber: missingSequenceNumbers) {
+			auto optPacket = storage->get(sequenceNumber);
+			if (optPacket.has_value()) {
+				auto packet = optPacket.value();
+				packets->push_back(packet);
+			}
+		}
+	}
+
+	if (!packets->empty()) {
+		return {message, ChainedOutgoingProduct(packets)};
+	} else {
+		return {message, nullopt};
+	}
+}
+
+ChainedOutgoingProduct RtcpNackResponder::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
+	for (auto message: *messages) {
+		storage->store(message);
+	}
+	return {messages, control};
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 31 - 17
src/rtcpsenderreporter.cpp → src/rtcpsrreporter.cpp

@@ -1,5 +1,4 @@
-/*
- * libdatachannel streamer example
+/**
  * Copyright (c) 2020 Filip Klembara (in2core)
  *
  * This program is free software; you can redistribute it and/or
@@ -18,40 +17,52 @@
 
 #if RTC_ENABLE_MEDIA
 
-#include "rtcpsenderreporter.hpp"
+#include "rtcpsrreporter.hpp"
 
 namespace rtc {
 
-void RtcpSenderReporter::startRecording() {
-	_previousReportedTimestamp = rtpConfig->timestamp;
-	timeOffset = rtpConfig->startTime_s - rtpConfig->timestampToSeconds(rtpConfig->timestamp);
+ChainedOutgoingProduct RtcpSrReporter::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
+	if (needsToReport) {
+		auto timestamp = rtpConfig->timestamp;
+		auto sr = getSenderReport(timestamp);
+		if (control) {
+			control->insert(control->end(), sr->begin(), sr->end());
+		} else {
+			control = sr;
+		}
+		needsToReport = false;
+	}
+	for (auto message: *messages) {
+		auto rtp = reinterpret_cast<RTP *>(message->data());
+		addToReport(rtp, message->size());
+	}
+	return {messages, control};
 }
 
-void RtcpSenderReporter::sendReport(uint32_t timestamp) {
-	auto sr = getSenderReport(timestamp);
-	_previousReportedTimestamp = timestamp;
-	senderReportOutgoingCallback(move(sr));
+void RtcpSrReporter::startRecording() {
+	_previousReportedTimestamp = rtpConfig->timestamp;
+	timeOffset = rtpConfig->startTime_s - rtpConfig->timestampToSeconds(rtpConfig->timestamp);
 }
 
-void RtcpSenderReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
+void RtcpSrReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
 	packetCount += 1;
 	assert(!rtp->padding());
 	payloadOctets += rtpSize - rtp->getSize();
 }
 
-RtcpSenderReporter::RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : rtpConfig(rtpConfig) {}
+RtcpSrReporter::RtcpSrReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
+: MediaHandlerElement(), rtpConfig(rtpConfig) {}
 
-uint64_t RtcpSenderReporter::secondsToNTP(double seconds) {
+uint64_t RtcpSrReporter::secondsToNTP(double seconds) {
 	return std::round(seconds * double(uint64_t(1) << 32));
 }
 
-void RtcpSenderReporter::setNeedsToReport() { needsToReport = true; }
+void RtcpSrReporter::setNeedsToReport() { needsToReport = true; }
 
-message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
+message_ptr RtcpSrReporter::getSenderReport(uint32_t timestamp) {
 	auto srSize = RTCP_SR::size(0);
 	auto msg = make_message(srSize + RTCP_SDES::size({{uint8_t(rtpConfig->cname.size())}}),
-	                        Message::Type::Control);
+							Message::Type::Control);
 	auto sr = reinterpret_cast<RTCP_SR *>(msg->data());
 	auto timestamp_s = rtpConfig->timestampToSeconds(timestamp);
 	auto currentTime = timeOffset + timestamp_s;
@@ -68,6 +79,9 @@ message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
 	item->type = 1;
 	item->setText(rtpConfig->cname);
 	sdes->preparePacket(1);
+
+	_previousReportedTimestamp = timestamp;
+
 	return msg;
 }
 

+ 3 - 3
src/rtppacketizer.cpp

@@ -25,8 +25,8 @@ namespace rtc {
 RtpPacketizer::RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
     : rtpConfig(rtpConfig) {}
 
-message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
-	auto msg = make_message(rtpHeaderSize + payload.size());
+binary_ptr RtpPacketizer::packetize(std::shared_ptr<binary> payload, bool setMark) {
+	auto msg = std::make_shared<binary>(rtpHeaderSize + payload->size());
 	auto *rtp = (RTP *)msg->data();
 	rtp->setPayloadType(rtpConfig->payloadType);
 	// increase sequence number
@@ -37,7 +37,7 @@ message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
 		rtp->setMarker(true);
 	}
 	rtp->preparePacket();
-	copy(payload.begin(), payload.end(), msg->begin() + rtpHeaderSize);
+	memcpy(msg->data() + rtpHeaderSize, payload->data(), payload->size());
 	return msg;
 }
 

+ 2 - 2
src/track.cpp

@@ -161,7 +161,7 @@ bool Track::outgoing([[maybe_unused]] message_ptr message) {
 #endif
 }
 
-void Track::setRtcpHandler(std::shared_ptr<RtcpHandler> handler) {
+void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) {
 	std::unique_lock lock(mRtcpHandlerMutex);
 	mRtcpHandler = std::move(handler);
 	if (mRtcpHandler) {
@@ -178,7 +178,7 @@ bool Track::requestKeyframe() {
 	return false;
 }
 
-std::shared_ptr<RtcpHandler> Track::getRtcpHandler() {
+std::shared_ptr<MediaHandler> Track::getRtcpHandler() {
 	std::shared_lock lock(mRtcpHandlerMutex);
 	return mRtcpHandler;
 }