Pārlūkot izejas kodu

Add chainable Rtcp Handler

Filip Klembara 4 gadi atpakaļ
vecāks
revīzija
569a317bf0

+ 8 - 2
CMakeLists.txt

@@ -67,13 +67,16 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizationconfig.cpp
-	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsenderreporter.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsrreporter.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/opusrtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/opuspacketizationhandler.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/h264rtppacketizer.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/nalunit.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/h264packetizationhandler.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpchainablehandler.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerelement.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerrootelement.cpp
 )
 
 set(LIBDATACHANNEL_WEBSOCKET_SOURCES
@@ -107,13 +110,16 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizationconfig.hpp
-	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsenderreporter.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsrreporter.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opusrtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opuspacketizationhandler.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264rtppacketizer.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/nalunit.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264packetizationhandler.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpchainablehandler.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerelement.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/messagehandlerrootelement.hpp
 )
 
 set(TESTS_SOURCES

+ 3 - 37
include/rtc/h264packetizationhandler.hpp

@@ -23,50 +23,16 @@
 
 #include "h264rtppacketizer.hpp"
 #include "nalunit.hpp"
-#include "rtcphandler.hpp"
-#include "rtcpsenderreporter.hpp"
+#include "rtcpchainablehandler.hpp"
 
 namespace rtc {
 
 /// Handler for H264 packetization
-class RTC_CPP_EXPORT H264PacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
-	/// RTP packetizer for H264
-	const std::shared_ptr<H264RtpPacketizer> packetizer;
-
-	const uint16_t maximumFragmentSize;
-
-	std::shared_ptr<NalUnits> splitMessage(message_ptr message);
-
+class RTC_CPP_EXPORT H264PacketizationHandler : public RtcpChainableHandler {
 public:
-	/// Nalunit separator
-	enum class Separator {
-		LongStartSequence,  // 0x00, 0x00, 0x00, 0x01
-		ShortStartSequence, // 0x00, 0x00, 0x01
-		StartSequence,      // LongStartSequence or ShortStartSequence
-		Length              // first 4 bytes is nal unit length
-	};
-
 	/// Construct handler for H264 packetization.
-	/// @param separator Nal units separator
 	/// @param packetizer RTP packetizer for h264
-	H264PacketizationHandler(Separator separator, std::shared_ptr<H264RtpPacketizer> packetizer,
-	                         uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
-
-	/// Returns message unchanged
-	/// @param ptr message
-	message_ptr incoming(message_ptr ptr) override;
-
-	/// Returns packetized message if message type is binary
-	/// @note NAL units in `ptr` message must be separated by `separator` given in constructor
-	/// @note If message generates multiple rtp packets, all but last are send using
-	/// `outgoingCallback`. It is your responsibility to send last packet.
-	/// @param ptr message containing all NAL units for current timestamp (one sample)
-	/// @return last packet
-	message_ptr outgoing(message_ptr ptr) override;
-
-private:
-	/// Separator
-	const Separator separator;
+	H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer);
 };
 
 } // namespace rtc

+ 23 - 2
include/rtc/h264rtppacketizer.hpp

@@ -21,22 +21,43 @@
 
 #if RTC_ENABLE_MEDIA
 
+#include "nalunit.hpp"
 #include "rtppacketizer.hpp"
+#include "messagehandlerrootelement.hpp"
 
 namespace rtc {
 
 /// RTP packetization of h264 payload
-class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer {
+class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer, public MessageHandlerRootElement {
+	std::shared_ptr<NalUnits> splitMessage(binary_ptr message);
+	const uint16_t maximumFragmentSize;
 
 public:
 	/// Default clock rate for H264 in RTP
 	static const auto defaultClockRate = 90 * 1000;
 
+	/// Nalunit separator
+	enum class Separator {
+		LongStartSequence,  // 0x00, 0x00, 0x00, 0x01
+		ShortStartSequence, // 0x00, 0x00, 0x01
+		StartSequence,      // LongStartSequence or ShortStartSequence
+		Length              // first 4 bytes is nal unit length
+	};
+
+	H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+					  uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+
 	/// Constructs h264 payload packetizer with given RTP configuration.
 	/// @note RTP configuration is used in packetization process which may change some configuration
 	/// properties such as sequence number.
 	/// @param rtpConfig  RTP configuration
-	H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+	/// @param maximumFragmentSize maximum size of one NALU fragment
+	H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+					  uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+
+	ChainedOutgoingProduct modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) override;
+private:
+	const Separator separator;
 };
 
 } // namespace rtc

+ 1 - 0
include/rtc/include.hpp

@@ -56,6 +56,7 @@ using std::byte;
 using std::string;
 using std::string_view;
 using binary = std::vector<byte>;
+using binary_ptr = std::shared_ptr<binary>;
 
 using std::nullopt;
 

+ 118 - 0
include/rtc/messagehandlerelement.hpp

@@ -0,0 +1,118 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_MESSAGE_HANDLER_ELEMENT_H
+#define RTC_MESSAGE_HANDLER_ELEMENT_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "include.hpp"
+#include "message.hpp"
+#include "rtp.hpp"
+
+namespace rtc {
+
+using ChainedMessagesProduct = std::shared_ptr<std::vector<binary_ptr>>;
+
+RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product();
+RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product(message_ptr msg);
+
+/// Ougoing messages
+struct RTC_CPP_EXPORT ChainedOutgoingProduct {
+	ChainedOutgoingProduct(ChainedMessagesProduct messages, std::optional<message_ptr> control = nullopt);
+	const ChainedMessagesProduct messages;
+	const std::optional<message_ptr> control;
+};
+
+/// Ougoing response to incoming messages
+struct RTC_CPP_EXPORT ChainedOutgoingResponseProduct {
+	ChainedOutgoingResponseProduct(std::optional<ChainedMessagesProduct> messages = nullopt, std::optional<message_ptr> control = nullopt);
+	const std::optional<ChainedMessagesProduct> messages;
+	const std::optional<message_ptr> control;
+};
+
+/// Incoming messages with response
+struct RTC_CPP_EXPORT ChainedIncomingProduct {
+	ChainedIncomingProduct(std::optional<ChainedMessagesProduct> incoming = nullopt, std::optional<ChainedMessagesProduct> outgoing = nullopt);
+	const std::optional<ChainedMessagesProduct> incoming;
+	const std::optional<ChainedOutgoingResponseProduct> outgoing;
+};
+
+/// Incoming control messages with response
+struct RTC_CPP_EXPORT ChainedIncomingControlProduct {
+	ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedMessagesProduct> outgoing = nullopt);
+	const std::optional<message_ptr> incoming;
+	const std::optional<ChainedOutgoingResponseProduct> outgoing;
+};
+
+/// Chainable handler
+class RTC_CPP_EXPORT MessageHandlerElement: public std::enable_shared_from_this<MessageHandlerElement> {
+	std::optional<std::shared_ptr<MessageHandlerElement>> upstream = nullopt;
+	std::optional<std::shared_ptr<MessageHandlerElement>> downstream = nullopt;
+
+	void prepareAndSendResponse(std::optional<ChainedOutgoingResponseProduct> outgoing, std::function<bool (ChainedOutgoingResponseProduct)> send);
+
+	void removeFromChain();
+public:
+	MessageHandlerElement();
+
+	/// Creates response to incoming message
+	/// @param messages Current repsonse
+	/// @returns New response
+	std::optional<ChainedOutgoingResponseProduct> processOutgoingResponse(ChainedOutgoingResponseProduct messages);
+
+	// Process incoming and ougoing messages
+	std::optional<message_ptr> processIncomingControl(message_ptr message, std::function<bool (ChainedOutgoingResponseProduct)> send);
+	std::optional<ChainedMessagesProduct> processIncomingBinary(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingResponseProduct)> send);
+	std::optional<message_ptr> processOutgoingControl(message_ptr message);
+	std::optional<ChainedOutgoingProduct> processOutgoingBinary(ChainedOutgoingProduct product);
+
+	/// Modifies current control message
+	/// @param messages current message
+	/// @returns Modified message and response
+	virtual ChainedIncomingControlProduct modifyIncomingControl(message_ptr messages);
+
+	/// Modifies current control message
+	/// @param messages current message
+	/// @returns Modified message
+	virtual message_ptr modifyOutgoingControl(message_ptr messages);
+
+	/// Modifies current binary message
+	/// @param messages current message
+	/// @returns Modified message and response
+	virtual ChainedIncomingProduct modifyIncomingBinary(ChainedMessagesProduct messages);
+
+	/// Modifies current binary message
+	/// @param messages current message
+	/// @param control current control message
+	/// @returns Modified binary message and control message
+	virtual ChainedOutgoingProduct modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control);
+
+	/// Set given element as upstream to this
+	/// @param upstream Upstream element
+	/// @returns Upstream element
+	std::shared_ptr<MessageHandlerElement> chainWith(std::shared_ptr<MessageHandlerElement> upstream);
+
+	/// Remove all downstream elements from chain
+	void recursiveRemoveChain();
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTC_MESSAGE_HANDLER_ELEMENT_H

+ 45 - 0
include/rtc/messagehandlerrootelement.hpp

@@ -0,0 +1,45 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTCP_MESSAGE_HANDLER_ROOT_ELEMENT_H
+#define RTCP_MESSAGE_HANDLER_ROOT_ELEMENT_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "messagehandlerelement.hpp"
+
+namespace rtc {
+
+/// Chainable message handler
+class RTC_CPP_EXPORT MessageHandlerRootElement : public MessageHandlerElement {
+public:
+	MessageHandlerRootElement() { }
+
+	/// Reduce multiple messages into one message
+	/// @param messages Messages to reduce
+	virtual message_ptr reduce(ChainedMessagesProduct messages);
+
+	/// Splits message into multiple messages
+	/// @param message Message to split
+	virtual ChainedMessagesProduct split(message_ptr message);
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTCP_MESSAGE_HANDLER_ROOT_ELEMENT_H

+ 5 - 6
include/rtc/nalunit.hpp

@@ -63,7 +63,7 @@ private:
 struct RTC_CPP_EXPORT NalUnit : binary {
 	NalUnit(const NalUnit &unit) = default;
 	NalUnit(size_t size, bool includingHeader = true)
-	    : binary(size + (includingHeader ? 0 : 1)) {}
+		: binary(size + (includingHeader ? 0 : 1)) {}
 
 	template <typename Iterator>
 	NalUnit(Iterator begin_, Iterator end_) : binary(begin_, end_) {}
@@ -101,9 +101,9 @@ struct RTC_CPP_EXPORT NalUnitFragmentA : NalUnit {
 	enum class FragmentType { Start, Middle, End };
 
 	NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri, uint8_t unitType,
-	                 binary data);
+					 binary data);
 
-	static std::vector<NalUnitFragmentA> fragmentsFrom(NalUnit nalu, uint16_t maximumFragmentSize);
+	static std::vector<std::shared_ptr<NalUnitFragmentA>> fragmentsFrom(std::shared_ptr<NalUnit> nalu, uint16_t maximumFragmentSize);
 
 	uint8_t unitType() { return fragmentHeader()->unitType(); }
 
@@ -142,11 +142,10 @@ protected:
 	const uint8_t nal_type_fu_A = 28;
 };
 
-class RTC_CPP_EXPORT NalUnits : public std::vector<NalUnit> {
+class RTC_CPP_EXPORT NalUnits : public std::vector<std::shared_ptr<NalUnit>> {
 public:
 	static const uint16_t defaultMaximumFragmentSize = 1400;
-	std::vector<binary>
-	generateFragments(uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
+	std::vector<std::shared_ptr<binary>> generateFragments(uint16_t maximumFragmentSize);
 };
 
 } // namespace rtc

+ 2 - 12
include/rtc/opuspacketizationhandler.hpp

@@ -22,27 +22,17 @@
 #if RTC_ENABLE_MEDIA
 
 #include "opusrtppacketizer.hpp"
-#include "rtcphandler.hpp"
-#include "rtcpsenderreporter.hpp"
+#include "rtcpchainablehandler.hpp"
 
 namespace rtc {
 
 /// Handler for opus packetization
-class RTC_CPP_EXPORT OpusPacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
-	/// RTP packetizer for opus
-	const std::shared_ptr<OpusRtpPacketizer> packetizer;
+class RTC_CPP_EXPORT OpusPacketizationHandler : public RtcpChainableHandler {
 
 public:
 	/// Construct handler for opus packetization.
 	/// @param packetizer RTP packetizer for opus
 	OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer);
-
-	/// Returns message unchanged
-	/// @param ptr message
-	message_ptr incoming(message_ptr ptr) override;
-	/// Returns packetized message if message type is binary
-	/// @param ptr message
-	message_ptr outgoing(message_ptr ptr) override;
 };
 
 } // namespace rtc

+ 9 - 3
include/rtc/opusrtppacketizer.hpp

@@ -22,12 +22,12 @@
 #if RTC_ENABLE_MEDIA
 
 #include "rtppacketizer.hpp"
+#include "messagehandlerrootelement.hpp"
 
 namespace rtc {
 
 /// RTP packetizer for opus
-class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer {
-
+class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer, public MessageHandlerRootElement {
 public:
 	/// default clock rate used in opus RTP communication
 	static const uint32_t defaultClockRate = 48 * 1000;
@@ -42,7 +42,13 @@ public:
 	/// @note This function increase sequence number after packetization.
 	/// @param payload RTP payload
 	/// @param setMark This needs to be `false` for all RTP packets with opus payload
-	message_ptr packetize(binary payload, bool setMark) override;
+	binary_ptr packetize(binary_ptr payload, bool setMark) override;
+
+	/// Creates RTP packet for given samples (all samples share same RTP timesamp)
+	/// @param messages opus samples
+	/// @param control RTCP
+	/// @returns RTP packets and unchanged `control`
+	ChainedOutgoingProduct modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) override;
 };
 
 } // namespace rtc

+ 4 - 1
include/rtc/rtc.h

@@ -239,6 +239,10 @@ RTC_EXPORT int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char
 /// @param _timestamp Timestamp
 RTC_EXPORT int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char * cname, uint8_t payloadType, uint32_t clockRate, uint16_t _sequenceNumber, uint32_t _timestamp);
 
+/// Chain RtcpSRReporter to handler chain for given track
+/// @param tr Track id
+int rtcChainRtcpSRReporter(int tr);
+
 /// Set start time for RTP stream
 /// @param startTime_s Start time in seconds
 /// @param timeIntervalSince1970 Set true if `startTime_s` is time interval since 1970, false if `startTime_s` is time interval since 1900
@@ -249,7 +253,6 @@ int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeInterva
 /// @param id Track identifier
 int rtcStartRtcpSenderReporterRecording(int id);
 
-
 /// Transform seconds to timestamp using track's clock rate
 /// @param id Track id
 /// @param seconds Seconds

+ 3 - 0
include/rtc/rtc.hpp

@@ -29,6 +29,9 @@
 
 // RTCP handling
 #include "rtcpreceivingsession.hpp"
+#include "rtcpchainablehandler.hpp"
+#include "rtcpsrreporter.hpp"
+#include "rtcpnackresponder.hpp"
 
 // Opus/h264 streaming
 #include "h264packetizationhandler.hpp"

+ 55 - 0
include/rtc/rtcpchainablehandler.hpp

@@ -0,0 +1,55 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef RTC_RTCP_CHAINABLE_HANDLER_H
+#define RTC_RTCP_CHAINABLE_HANDLER_H
+
+#if RTC_ENABLE_MEDIA
+
+#include "rtcphandler.hpp"
+#include "messagehandlerrootelement.hpp"
+
+namespace rtc {
+
+class RTC_CPP_EXPORT RtcpChainableHandler : public RtcpHandler {
+	const std::shared_ptr<MessageHandlerRootElement> root;
+	std::shared_ptr<MessageHandlerElement> leaf;
+	std::mutex inoutMutex;
+
+	std::optional<message_ptr> handleIncomingBinary(message_ptr);
+	std::optional<message_ptr> handleIncomingControl(message_ptr);
+	std::optional<message_ptr> handleOutgoingBinary(message_ptr);
+	std::optional<message_ptr> handleOutgoingControl(message_ptr);
+	bool sendProduct(ChainedOutgoingResponseProduct product);
+public:
+	RtcpChainableHandler(std::shared_ptr<MessageHandlerRootElement> root);
+	~RtcpChainableHandler();
+	message_ptr incoming(message_ptr ptr) override;
+	message_ptr outgoing(message_ptr ptr) override;
+
+	bool send(message_ptr msg);
+
+	/// Adds element to chain
+	/// @param chainable Chainable element
+    void addToChain(std::shared_ptr<MessageHandlerElement> chainable);
+};
+
+} // namespace rtc
+
+#endif // RTC_ENABLE_MEDIA
+
+#endif // RTC_RTCP_CHAINABLE_HANDLER_H

+ 7 - 30
include/rtc/rtcpsenderreporter.hpp → include/rtc/rtcpsrreporter.hpp

@@ -1,5 +1,4 @@
-/*
- * libdatachannel streamer example
+/**
  * Copyright (c) 2020 Filip Klembara (in2core)
  *
  * This program is free software; you can redistribute it and/or
@@ -23,11 +22,12 @@
 
 #include "message.hpp"
 #include "rtppacketizationconfig.hpp"
+#include "messagehandlerelement.hpp"
 
 namespace rtc {
 
-/// Class for sending RTCP SR
-class RTC_CPP_EXPORT RtcpSenderReporter {
+class RTC_CPP_EXPORT RtcpSRReporter: public MessageHandlerElement {
+
 	bool needsToReport = false;
 
 	uint32_t packetCount = 0;
@@ -39,10 +39,6 @@ class RTC_CPP_EXPORT RtcpSenderReporter {
 	void addToReport(RTP *rtp, uint32_t rtpSize);
 	message_ptr getSenderReport(uint32_t timestamp);
 
-protected:
-	/// Outgoing callback for sender reports
-	synchronized_callback<message_ptr> senderReportOutgoingCallback;
-
 public:
 	static uint64_t secondsToNTP(double seconds);
 
@@ -52,7 +48,9 @@ public:
 	/// RTP configuration
 	const std::shared_ptr<RtpPacketizationConfig> rtpConfig;
 
-	RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+	RtcpSRReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
+
+	ChainedOutgoingProduct modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) override;
 
 	/// Set `needsToReport` flag. Sender report will be sent before next RTP packet with same
 	/// timestamp.
@@ -63,27 +61,6 @@ public:
 	/// @note `time_offset = rtpConfig->startTime_s -
 	/// rtpConfig->timestampToSeconds(rtpConfig->timestamp)`
 	void startRecording();
-
-	/// Send RTCP SR with given timestamp
-	/// @param timestamp timestamp of the RTCP SR
-	void sendReport(uint32_t timestamp);
-
-protected:
-	/// Calls given block with function for statistics. Sends RTCP SR packet with current timestamp
-	/// before `block` call if `needs_to_report` flag is true.
-	/// @param block Block of code to run. This block has function for rtp stats recording.
-	template <typename T>
-	T withStatsRecording(std::function<T(std::function<void(message_ptr)>)> block) {
-		if (needsToReport) {
-			sendReport(rtpConfig->timestamp);
-			needsToReport = false;
-		}
-		auto result = block([this](message_ptr _rtp) {
-			auto rtp = reinterpret_cast<RTP *>(_rtp->data());
-			this->addToReport(rtp, _rtp->size());
-		});
-		return result;
-	}
 };
 
 } // namespace rtc

+ 1 - 1
include/rtc/rtppacketizer.hpp

@@ -44,7 +44,7 @@ public:
 	/// @note This function increase sequence number after packetization.
 	/// @param payload RTP payload
 	/// @param setMark Set marker flag in RTP packet if true
-	virtual message_ptr packetize(binary payload, bool setMark);
+	virtual std::shared_ptr<binary> packetize(std::shared_ptr<binary> payload, bool setMark);
 };
 
 } // namespace rtc

+ 43 - 16
src/capi.cpp

@@ -53,7 +53,8 @@ std::unordered_map<int, shared_ptr<PeerConnection>> peerConnectionMap;
 std::unordered_map<int, shared_ptr<DataChannel>> dataChannelMap;
 std::unordered_map<int, shared_ptr<Track>> trackMap;
 #if RTC_ENABLE_MEDIA
-std::unordered_map<int, shared_ptr<RtcpSenderReporter>> rtcpSenderMap;
+std::unordered_map<int, shared_ptr<RtcpChainableHandler>> rtcpChainableHandlerMap;
+std::unordered_map<int, shared_ptr<RtcpSRReporter>> rtcpSenderMap;
 std::unordered_map<int, shared_ptr<RtpPacketizationConfig>> rtpConfigMap;
 #endif
 #if RTC_ENABLE_WEBSOCKET
@@ -142,6 +143,7 @@ void eraseTrack(int tr) {
 		throw std::invalid_argument("Track ID does not exist");
 #if RTC_ENABLE_MEDIA
 	rtcpSenderMap.erase(tr);
+	rtcpChainableHandlerMap.erase(tr);
 	rtpConfigMap.erase(tr);
 #endif
 	userPointerMap.erase(tr);
@@ -149,25 +151,41 @@ void eraseTrack(int tr) {
 
 #if RTC_ENABLE_MEDIA
 
-shared_ptr<RtcpSenderReporter> getRTCPSender(int id) {
+shared_ptr<RtcpSRReporter> getRTCPSender(int id) {
 	std::lock_guard lock(mutex);
-	if (auto it = rtcpSenderMap.find(id); it != rtcpSenderMap.end())
+	if (auto it = rtcpSenderMap.find(id); it != rtcpSenderMap.end()) {
 		return it->second;
-	else
-		throw std::invalid_argument("RtcpSenderReporter ID does not exist");
+	} else {
+		throw std::invalid_argument("RtcpSRReporter ID does not exist");
+	}
 }
 
-void emplaceRTCPSender(shared_ptr<RtcpSenderReporter> ptr, int tr) {
+void emplaceRTCPSender(shared_ptr<RtcpSRReporter> ptr, int tr) {
 	std::lock_guard lock(mutex);
 	rtcpSenderMap.emplace(std::make_pair(tr, ptr));
 }
 
+shared_ptr<RtcpChainableHandler> getRTCPChainableHandler(int id) {
+	std::lock_guard lock(mutex);
+	if (auto it = rtcpChainableHandlerMap.find(id); it != rtcpChainableHandlerMap.end()) {
+		return it->second;
+	} else {
+		throw std::invalid_argument("RtcpChainableHandler ID does not exist");
+	}
+}
+
+void emplaceRTCPChainableHandler(shared_ptr<RtcpChainableHandler> ptr, int tr) {
+	std::lock_guard lock(mutex);
+	rtcpChainableHandlerMap.emplace(std::make_pair(tr, ptr));
+}
+
 shared_ptr<RtpPacketizationConfig> getRTPConfig(int id) {
 	std::lock_guard lock(mutex);
-	if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end())
+	if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end()) {
 		return it->second;
-	else
+	} else {
 		throw std::invalid_argument("RTPConfiguration ID does not exist");
+	}
 }
 
 void emplaceRTPConfig(shared_ptr<RtpPacketizationConfig> ptr, int tr) {
@@ -534,13 +552,12 @@ int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char *cname, uin
 		auto track = getTrack(tr);
 		// create RTP configuration
 		auto rtpConfig = getNewRtpPacketizationConfig(ssrc, cname, payloadType, clockRate,
-		                                              sequenceNumber, timestamp);
+													  sequenceNumber, timestamp);
 		// create packetizer
-		auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig));
-		// create H264 and RTCP SP handler
-		shared_ptr<H264PacketizationHandler> h264Handler(
-		    new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer, maxFragmentSize));
-		emplaceRTCPSender(h264Handler, tr);
+		auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig, maxFragmentSize));
+		// create H264 handler
+		shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(packetizer));
+		emplaceRTCPChainableHandler(h264Handler, tr);
 		emplaceRTPConfig(rtpConfig, tr);
 		// set handler
 		track->setRtcpHandler(h264Handler);
@@ -557,15 +574,25 @@ int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char *cname, uin
 		                                              sequenceNumber, timestamp);
 		// create packetizer
 		auto packetizer = shared_ptr<OpusRtpPacketizer>(new OpusRtpPacketizer(rtpConfig));
-		// create Opus and RTCP SP handler
+		// create Opus handler
 		shared_ptr<OpusPacketizationHandler> opusHandler(new OpusPacketizationHandler(packetizer));
-		emplaceRTCPSender(opusHandler, tr);
+        emplaceRTCPChainableHandler(opusHandler, tr);
 		emplaceRTPConfig(rtpConfig, tr);
 		// set handler
 		track->setRtcpHandler(opusHandler);
 	});
 }
 
+int rtcChainRtcpSRReporter(int tr) {
+	return WRAP({
+		auto config = getRTPConfig(tr);
+		auto reporter = std::make_shared<RtcpSRReporter>(config);
+		emplaceRTCPSender(reporter, tr);
+		auto chainableHandler = getRTCPChainableHandler(tr);
+		chainableHandler->addToChain(reporter);
+	});
+}
+
 int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeIntervalSince1970,
                                     uint32_t timestamp) {
 	return WRAP({

+ 1 - 140
src/h264packetizationhandler.cpp

@@ -22,146 +22,7 @@
 
 namespace rtc {
 
-using std::function;
-using std::make_shared;
-using std::shared_ptr;
-
-typedef enum {
-	NUSM_noMatch,
-	NUSM_firstZero,
-	NUSM_secondZero,
-	NUSM_thirdZero,
-	NUSM_shortMatch,
-	NUSM_longMatch
-} NalUnitStartSequenceMatch;
-
-NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
-                                                 H264PacketizationHandler::Separator separator) {
-	assert(separator != H264PacketizationHandler::Separator::Length);
-	auto byte = (uint8_t)_byte;
-	auto detectShort = separator == H264PacketizationHandler::Separator::ShortStartSequence ||
-	                   separator == H264PacketizationHandler::Separator::StartSequence;
-	auto detectLong = separator == H264PacketizationHandler::Separator::LongStartSequence ||
-	                  separator == H264PacketizationHandler::Separator::StartSequence;
-	switch (match) {
-	case NUSM_noMatch:
-		if (byte == 0x00) {
-			return NUSM_firstZero;
-		}
-		break;
-	case NUSM_firstZero:
-		if (byte == 0x00) {
-			return NUSM_secondZero;
-		}
-		break;
-	case NUSM_secondZero:
-		if (byte == 0x00 && detectLong) {
-			return NUSM_thirdZero;
-		} else if (byte == 0x01 && detectShort) {
-			return NUSM_shortMatch;
-		}
-		break;
-	case NUSM_thirdZero:
-		if (byte == 0x01 && detectLong) {
-			return NUSM_longMatch;
-		}
-		break;
-	case NUSM_shortMatch:
-		return NUSM_shortMatch;
-	case NUSM_longMatch:
-		return NUSM_longMatch;
-	}
-	return NUSM_noMatch;
-}
-
-message_ptr H264PacketizationHandler::incoming(message_ptr ptr) { return ptr; }
-
-shared_ptr<NalUnits> H264PacketizationHandler::splitMessage(message_ptr message) {
-	auto nalus = make_shared<NalUnits>();
-	if (separator == Separator::Length) {
-		unsigned long long index = 0;
-		while (index < message->size()) {
-			assert(index + 4 < message->size());
-			if (index + 4 >= message->size()) {
-				LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
-				break;
-			}
-			auto lengthPtr = (uint32_t *)(message->data() + index);
-			uint32_t length = ntohl(*lengthPtr);
-			auto naluStartIndex = index + 4;
-			auto naluEndIndex = naluStartIndex + length;
-
-			assert(naluEndIndex <= message->size());
-			if (naluEndIndex > message->size()) {
-				LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
-				break;
-			}
-			nalus->push_back(
-			    NalUnit(message->begin() + naluStartIndex, message->begin() + naluEndIndex));
-			index = naluEndIndex;
-		}
-	} else {
-		NalUnitStartSequenceMatch match = NUSM_noMatch;
-		unsigned long long index = 0;
-		while (index < message->size()) {
-			match = StartSequenceMatchSucc(match, (*message)[index++], separator);
-			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
-				match = NUSM_noMatch;
-				break;
-			}
-		}
-		index++;
-		unsigned long long naluStartIndex = index;
-
-		while (index < message->size()) {
-			match = StartSequenceMatchSucc(match, (*message)[index], separator);
-			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
-				auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
-				unsigned long long naluEndIndex = index - sequenceLength;
-				match = NUSM_noMatch;
-				nalus->push_back(NalUnit(message->begin() + naluStartIndex,
-				                         message->begin() + naluEndIndex + 1));
-				naluStartIndex = index + 1;
-			}
-			index++;
-		}
-		nalus->push_back(NalUnit(message->begin() + naluStartIndex, message->end()));
-	}
-	return nalus;
-}
-
-message_ptr H264PacketizationHandler::outgoing(message_ptr ptr) {
-	if (ptr->type == Message::Binary) {
-		auto nalus = splitMessage(ptr);
-		auto fragments = nalus->generateFragments(maximumFragmentSize);
-
-		auto lastPacket = withStatsRecording<message_ptr>(
-		    [fragments, this](function<void(message_ptr)> addToReport) {
-			    for (unsigned long long index = 0; index < fragments.size() - 1; index++) {
-				    auto packet = packetizer->packetize(fragments[index], false);
-
-				    addToReport(packet);
-
-				    outgoingCallback(std::move(packet));
-			    }
-			    // packet is last, marker must be set
-			    auto lastPacket = packetizer->packetize(fragments[fragments.size() - 1], true);
-			    addToReport(lastPacket);
-			    return lastPacket;
-		    });
-		return lastPacket;
-	}
-	return ptr;
-}
-
-H264PacketizationHandler::H264PacketizationHandler(Separator separator,
-                                                   std::shared_ptr<H264RtpPacketizer> packetizer,
-                                                   uint16_t maximumFragmentSize)
-    : RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer),
-      maximumFragmentSize(maximumFragmentSize), separator(separator) {
-
-	senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
-}
+H264PacketizationHandler::H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer): RtcpChainableHandler(packetizer) { }
 
 } // namespace rtc
 

+ 133 - 2
src/h264rtppacketizer.cpp

@@ -22,8 +22,139 @@
 
 namespace rtc {
 
-H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : RtpPacketizer(rtpConfig) {}
+using std::make_shared;
+using std::shared_ptr;
+
+typedef enum {
+	NUSM_noMatch,
+	NUSM_firstZero,
+	NUSM_secondZero,
+	NUSM_thirdZero,
+	NUSM_shortMatch,
+	NUSM_longMatch
+} NalUnitStartSequenceMatch;
+
+NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
+												 H264RtpPacketizer::Separator separator) {
+	assert(separator != H264RtpPacketizer::Separator::Length);
+	auto byte = (uint8_t)_byte;
+	auto detectShort = separator == H264RtpPacketizer::Separator::ShortStartSequence ||
+	separator == H264RtpPacketizer::Separator::StartSequence;
+	auto detectLong = separator == H264RtpPacketizer::Separator::LongStartSequence ||
+	separator == H264RtpPacketizer::Separator::StartSequence;
+	switch (match) {
+		case NUSM_noMatch:
+			if (byte == 0x00) {
+				return NUSM_firstZero;
+			}
+			break;
+		case NUSM_firstZero:
+			if (byte == 0x00) {
+				return NUSM_secondZero;
+			}
+			break;
+		case NUSM_secondZero:
+			if (byte == 0x00 && detectLong) {
+				return NUSM_thirdZero;
+			} else if (byte == 0x01 && detectShort) {
+				return NUSM_shortMatch;
+			}
+			break;
+		case NUSM_thirdZero:
+			if (byte == 0x01 && detectLong) {
+				return NUSM_longMatch;
+			}
+			break;
+		case NUSM_shortMatch:
+			return NUSM_shortMatch;
+		case NUSM_longMatch:
+			return NUSM_longMatch;
+	}
+	return NUSM_noMatch;
+}
+
+shared_ptr<NalUnits> H264RtpPacketizer::splitMessage(binary_ptr message) {
+	auto nalus = make_shared<NalUnits>();
+	if (separator == Separator::Length) {
+		unsigned long long index = 0;
+		while (index < message->size()) {
+			assert(index + 4 < message->size());
+			if (index + 4 >= message->size()) {
+				LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
+				break;
+			}
+			auto lengthPtr = (uint32_t *)(message->data() + index);
+			uint32_t length = ntohl(*lengthPtr);
+			auto naluStartIndex = index + 4;
+			auto naluEndIndex = naluStartIndex + length;
+
+			assert(naluEndIndex <= message->size());
+			if (naluEndIndex > message->size()) {
+				LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
+				break;
+			}
+			auto begin = message->begin() + naluStartIndex;
+			auto end = message->begin() + naluEndIndex;
+			nalus->push_back(std::make_shared<NalUnit>(begin, end));
+			index = naluEndIndex;
+		}
+	} else {
+		NalUnitStartSequenceMatch match = NUSM_noMatch;
+		unsigned long long index = 0;
+		while (index < message->size()) {
+			match = StartSequenceMatchSucc(match, (*message)[index++], separator);
+			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
+				match = NUSM_noMatch;
+				break;
+			}
+		}
+		index++;
+		unsigned long long naluStartIndex = index;
+
+		while (index < message->size()) {
+			match = StartSequenceMatchSucc(match, (*message)[index], separator);
+			if (match == NUSM_longMatch || match == NUSM_shortMatch) {
+				auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
+				unsigned long long naluEndIndex = index - sequenceLength;
+				match = NUSM_noMatch;
+				auto begin = message->begin() + naluStartIndex;
+				auto end = message->begin() + naluEndIndex + 1;
+				nalus->push_back(std::make_shared<NalUnit>(begin, end));
+				naluStartIndex = index + 1;
+			}
+			index++;
+		}
+		auto begin = message->begin() + naluStartIndex;
+		auto end = message->end();
+		nalus->push_back(std::make_shared<NalUnit>(begin, end));
+	}
+	return nalus;
+}
+
+H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+									 uint16_t maximumFragmentSize)
+: RtpPacketizer(rtpConfig), MessageHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(Separator::Length) {}
+
+H264RtpPacketizer::H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
+									 uint16_t maximumFragmentSize)
+: RtpPacketizer(rtpConfig), MessageHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(separator) {}
+
+ChainedOutgoingProduct H264RtpPacketizer::modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) {
+	ChainedMessagesProduct packets = std::make_shared<std::vector<binary_ptr>>();
+	for (auto message: *messages) {
+		auto nalus = splitMessage(message);
+		auto fragments = nalus->generateFragments(maximumFragmentSize);
+		if (fragments.size() == 0) {
+			return ChainedOutgoingProduct({});
+		}
+		unsigned i = 0;
+		for (; i < fragments.size() - 1; i++) {
+			packets->push_back(packetize(fragments[i], false));
+		}
+		packets->push_back(packetize(fragments[i], true));
+	}
+	return {packets, control};
+}
 
 } // namespace rtc
 

+ 217 - 0
src/messagehandlerelement.cpp

@@ -0,0 +1,217 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "messagehandlerelement.hpp"
+
+namespace rtc {
+
+ChainedMessagesProduct make_chained_messages_product() {
+	std::vector<binary_ptr> msgs = {};
+	return std::make_shared<std::vector<binary_ptr>>(msgs);
+}
+
+ChainedMessagesProduct make_chained_messages_product(message_ptr msg) {
+	std::vector<binary_ptr> msgs = {msg};
+	return std::make_shared<std::vector<binary_ptr>>(msgs);
+}
+
+ChainedOutgoingProduct::ChainedOutgoingProduct(ChainedMessagesProduct messages, std::optional<message_ptr> control)
+: messages(messages), control(control) { }
+
+ChainedOutgoingResponseProduct::ChainedOutgoingResponseProduct(std::optional<ChainedMessagesProduct> messages, std::optional<message_ptr> control)
+: messages(messages), control(control) { }
+
+ChainedIncomingProduct::ChainedIncomingProduct(std::optional<ChainedMessagesProduct> incoming, std::optional<ChainedMessagesProduct> outgoing)
+: incoming(incoming), outgoing(outgoing) { }
+
+ChainedIncomingControlProduct::ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedMessagesProduct> outgoing)
+: incoming(incoming), outgoing(outgoing) { }
+
+MessageHandlerElement::MessageHandlerElement() { }
+
+void MessageHandlerElement::removeFromChain() {
+	if (upstream.has_value()) {
+		upstream.value()->downstream = downstream;
+	}
+	if (downstream.has_value()) {
+		downstream.value()->upstream = upstream;
+	}
+	upstream = nullopt;
+	downstream = nullopt;
+}
+
+void MessageHandlerElement::recursiveRemoveChain() {
+	if (downstream.has_value()) {
+		downstream.value()->recursiveRemoveChain();
+	}
+	removeFromChain();
+}
+
+std::optional<ChainedOutgoingResponseProduct> MessageHandlerElement::processOutgoingResponse(ChainedOutgoingResponseProduct messages) {
+	if (messages.messages.has_value()) {
+		if (upstream.has_value()) {
+			auto msgs = upstream.value()->processOutgoingBinary(ChainedOutgoingProduct(messages.messages.value(), messages.control));
+			if (msgs.has_value()) {
+				auto messages = msgs.value();
+				return ChainedOutgoingResponseProduct(std::make_optional(messages.messages), messages.control);
+			} else {
+				LOG_ERROR << "Generating outgoing control message failed";
+				return nullopt;
+			}
+		} else {
+			return messages;
+		}
+	} else if (messages.control.has_value()) {
+		if (upstream.has_value()) {
+			auto control = upstream.value()->processOutgoingControl(messages.control.value());
+			if (control.has_value()) {
+				return ChainedOutgoingResponseProduct(nullopt, control.value());
+			} else {
+				LOG_ERROR << "Generating outgoing control message failed";
+				return nullopt;
+			}
+		} else {
+			return messages;
+		}
+	} else {
+		return ChainedOutgoingResponseProduct();
+	}
+}
+
+void MessageHandlerElement::prepareAndSendResponse(std::optional<ChainedOutgoingResponseProduct> outgoing, std::function<bool (ChainedOutgoingResponseProduct)> send) {
+	if (outgoing.has_value()) {
+		auto message = outgoing.value();
+		auto response = processOutgoingResponse(message);
+		if (response.has_value()) {
+			if(!send(response.value())) {
+				LOG_DEBUG << "Send failed";
+			}
+		} else {
+			LOG_DEBUG << "No response to send";
+		}
+	}
+}
+
+std::optional<message_ptr> MessageHandlerElement::processIncomingControl(message_ptr message, std::function<bool (ChainedOutgoingResponseProduct)> send) {
+	assert(message);
+	auto product = modifyIncomingControl(message);
+	prepareAndSendResponse(product.outgoing, send);
+	if (product.incoming.has_value()) {
+		if (downstream.has_value()) {
+			if (product.incoming.value()) {
+				return downstream.value()->processIncomingControl(product.incoming.value(), send);
+			} else {
+				LOG_DEBUG << "Failed to generate incoming message";
+				return nullopt;
+			}
+		} else {
+			return product.incoming;
+		}
+	} else {
+		return product.incoming;
+	}
+}
+
+std::optional<ChainedMessagesProduct> MessageHandlerElement::processIncomingBinary(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingResponseProduct)> send) {
+	assert(messages && !messages->empty());
+	auto product = modifyIncomingBinary(messages);
+	prepareAndSendResponse(product.outgoing, send);
+	if (product.incoming.has_value()) {
+		if (downstream.has_value()) {
+			if (product.incoming.value()) {
+				return downstream.value()->processIncomingBinary(product.incoming.value(), send);
+			} else {
+				LOG_ERROR << "Failed to generate incoming message";
+				return nullopt;
+			}
+		} else {
+			return product.incoming;
+		}
+	} else {
+		return product.incoming;
+	}
+}
+
+std::optional<message_ptr> MessageHandlerElement::processOutgoingControl(message_ptr message) {
+	assert(message);
+	auto newMessage = modifyOutgoingControl(message);
+	assert(newMessage);
+	if(!newMessage) {
+		LOG_ERROR << "Failed to generate outgoing message";
+		return nullopt;
+	}
+	if (upstream.has_value()) {
+		return upstream.value()->processOutgoingControl(newMessage);
+	} else {
+		return newMessage;
+	}
+}
+
+std::optional<ChainedOutgoingProduct> MessageHandlerElement::processOutgoingBinary(ChainedOutgoingProduct product) {
+	assert(product.messages && !product.messages->empty());
+	auto newProduct = modifyOutgoingBinary(product.messages, product.control);
+	assert(!product.control.has_value() || newProduct.control.has_value());
+	assert(!newProduct.control.has_value() || newProduct.control.value());
+	assert(newProduct.messages && !newProduct.messages->empty());
+	if (product.control.has_value() && !newProduct.control.has_value()) {
+		LOG_ERROR << "Outgoing message must not remove control message";
+		return nullopt;
+	}
+	if (newProduct.control.has_value() && !newProduct.control.value()) {
+		LOG_ERROR << "Failed to generate control message";
+		return nullopt;
+	}
+	if (!newProduct.messages || newProduct.messages->empty()) {
+		LOG_ERROR << "Failed to generate message";
+		return nullopt;
+	}
+	if (upstream.has_value()) {
+		return upstream.value()->processOutgoingBinary(newProduct);
+	} else {
+		return newProduct;
+	}
+}
+
+ChainedIncomingControlProduct MessageHandlerElement::modifyIncomingControl(message_ptr messages) {
+	return {messages};
+}
+
+message_ptr MessageHandlerElement::modifyOutgoingControl(message_ptr messages) {
+	return messages;
+}
+
+ChainedIncomingProduct MessageHandlerElement::modifyIncomingBinary(ChainedMessagesProduct messages) {
+	return {messages};
+}
+
+ChainedOutgoingProduct MessageHandlerElement::modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) {
+	return {messages, control};
+}
+
+std::shared_ptr<MessageHandlerElement> MessageHandlerElement::chainWith(std::shared_ptr<MessageHandlerElement> upstream) {
+	assert(this->upstream == nullopt);
+	assert(upstream->downstream == nullopt);
+	this->upstream = upstream;
+	upstream->downstream = shared_from_this();
+	return upstream;
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 43 - 0
src/messagehandlerrootelement.cpp

@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "messagehandlerrootelement.hpp"
+
+namespace rtc {
+
+message_ptr MessageHandlerRootElement::reduce(ChainedMessagesProduct messages) {
+	if (messages && !messages->empty()) {
+		auto msg_ptr = messages->front();
+		if (msg_ptr) {
+			return make_message(*msg_ptr);
+		} else {
+			return nullptr;
+		}
+	} else {
+		return nullptr;
+	}
+}
+
+ChainedMessagesProduct MessageHandlerRootElement::split(message_ptr message) {
+	return make_chained_messages_product(message);
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 31 - 31
src/nalunit.cpp

@@ -24,8 +24,8 @@
 namespace rtc {
 
 NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri,
-                                   uint8_t unitType, binary data)
-    : NalUnit(data.size() + 2) {
+								   uint8_t unitType, binary data)
+: NalUnit(data.size() + 2) {
 	setForbiddenBit(forbiddenBit);
 	setNRI(nri);
 	fragmentIndicator()->setUnitType(NalUnitFragmentA::nal_type_fu_A);
@@ -34,23 +34,23 @@ NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t
 	copy(data.begin(), data.end(), begin() + 2);
 }
 
-std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
-                                                              uint16_t maximumFragmentSize) {
-	assert(nalu.size() > maximumFragmentSize);
-	if (nalu.size() <= maximumFragmentSize) {
+std::vector<std::shared_ptr<NalUnitFragmentA>> NalUnitFragmentA::fragmentsFrom(std::shared_ptr<NalUnit> nalu,
+																			   uint16_t maximumFragmentSize) {
+	assert(nalu->size() > maximumFragmentSize);
+	if (nalu->size() <= maximumFragmentSize) {
 		// we need to change `maximum_fragment_size` to have at least two fragments
-		maximumFragmentSize = nalu.size() / 2;
+		maximumFragmentSize = nalu->size() / 2;
 	}
-	auto fragments_count = ceil(double(nalu.size()) / maximumFragmentSize);
-	maximumFragmentSize = ceil(nalu.size() / fragments_count);
+	auto fragments_count = ceil(double(nalu->size()) / maximumFragmentSize);
+	maximumFragmentSize = ceil(nalu->size() / fragments_count);
 
 	// 2 bytes for FU indicator and FU header
 	maximumFragmentSize -= 2;
-	auto f = nalu.forbiddenBit();
-	uint8_t nri = nalu.nri() & 0x03;
-	uint8_t naluType = nalu.unitType() & 0x1F;
-	auto payload = nalu.payload();
-	vector<NalUnitFragmentA> result{};
+	auto f = nalu->forbiddenBit();
+	uint8_t nri = nalu->nri() & 0x03;
+	uint8_t naluType = nalu->unitType() & 0x1F;
+	auto payload = nalu->payload();
+	vector<std::shared_ptr<NalUnitFragmentA>> result{};
 	uint64_t offset = 0;
 	while (offset < payload.size()) {
 		vector<byte> fragmentData;
@@ -66,7 +66,7 @@ std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
 			fragmentType = FragmentType::End;
 		}
 		fragmentData = {payload.begin() + offset, payload.begin() + offset + maximumFragmentSize};
-		NalUnitFragmentA fragment{fragmentType, f, nri, naluType, fragmentData};
+		auto fragment = std::make_shared<NalUnitFragmentA>(fragmentType, f, nri, naluType, fragmentData);
 		result.push_back(fragment);
 		offset += maximumFragmentSize;
 	}
@@ -76,26 +76,26 @@ std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
 void NalUnitFragmentA::setFragmentType(FragmentType type) {
 	fragmentHeader()->setReservedBit6(false);
 	switch (type) {
-	case FragmentType::Start:
-		fragmentHeader()->setStart(true);
-		fragmentHeader()->setEnd(false);
-		break;
-	case FragmentType::End:
-		fragmentHeader()->setStart(false);
-		fragmentHeader()->setEnd(true);
-		break;
-	default:
-		fragmentHeader()->setStart(false);
-		fragmentHeader()->setEnd(false);
+		case FragmentType::Start:
+			fragmentHeader()->setStart(true);
+			fragmentHeader()->setEnd(false);
+			break;
+		case FragmentType::End:
+			fragmentHeader()->setStart(false);
+			fragmentHeader()->setEnd(true);
+			break;
+		default:
+			fragmentHeader()->setStart(false);
+			fragmentHeader()->setEnd(false);
 	}
 }
 
-std::vector<binary> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
-	vector<binary> result{};
+std::vector<std::shared_ptr<binary>> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
+	vector<std::shared_ptr<binary>> result{};
 	for (auto nalu : *this) {
-		if (nalu.size() > maximumFragmentSize) {
-			std::vector<NalUnitFragmentA> fragments =
-			    NalUnitFragmentA::fragmentsFrom(nalu, maximumFragmentSize);
+		if (nalu->size() > maximumFragmentSize) {
+			std::vector<std::shared_ptr<NalUnitFragmentA>> fragments =
+			NalUnitFragmentA::fragmentsFrom(nalu, maximumFragmentSize);
 			result.insert(result.end(), fragments.begin(), fragments.end());
 		} else {
 			result.push_back(nalu);

+ 1 - 17
src/opuspacketizationhandler.cpp

@@ -23,23 +23,7 @@
 namespace rtc {
 
 OpusPacketizationHandler::OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer)
-    : RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer) {
-	senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
-}
-
-message_ptr OpusPacketizationHandler::incoming(message_ptr ptr) { return ptr; }
-
-message_ptr OpusPacketizationHandler::outgoing(message_ptr ptr) {
-	if (ptr->type == Message::Binary) {
-		return withStatsRecording<message_ptr>(
-		    [this, ptr](std::function<void(message_ptr)> addToReport) {
-			    auto rtp = packetizer->packetize(*ptr, false);
-			    addToReport(rtp);
-			    return rtp;
-		    });
-	}
-	return ptr;
-}
+: RtcpChainableHandler(packetizer) { }
 
 } // namespace rtc
 

+ 11 - 2
src/opusrtppacketizer.cpp

@@ -23,13 +23,22 @@
 namespace rtc {
 
 OpusRtpPacketizer::OpusRtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : RtpPacketizer(rtpConfig) {}
+: RtpPacketizer(rtpConfig), MessageHandlerRootElement() {}
 
-message_ptr OpusRtpPacketizer::packetize(binary payload, bool setMark) {
+binary_ptr OpusRtpPacketizer::packetize(binary_ptr payload, bool setMark) {
 	assert(!setMark);
 	return RtpPacketizer::packetize(payload, false);
 }
 
+ChainedOutgoingProduct OpusRtpPacketizer::modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) {
+	ChainedMessagesProduct packets = make_chained_messages_product();
+	packets->reserve(messages->size());
+	for (auto message: *messages) {
+		packets->push_back(packetize(message, false));
+	}
+	return {packets, control};
+}
+
 } // namespace rtc
 
 #endif /* RTC_ENABLE_MEDIA */

+ 166 - 0
src/rtcpchainablehandler.cpp

@@ -0,0 +1,166 @@
+/**
+ * Copyright (c) 2020 Filip Klembara (in2core)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#if RTC_ENABLE_MEDIA
+
+#include "rtcpchainablehandler.hpp"
+
+namespace rtc {
+
+RtcpChainableHandler::RtcpChainableHandler(std::shared_ptr<MessageHandlerRootElement> root): root(root), leaf(root) { }
+
+RtcpChainableHandler::~RtcpChainableHandler() {
+	leaf->recursiveRemoveChain();
+}
+
+bool RtcpChainableHandler::sendProduct(ChainedOutgoingResponseProduct product) {
+	bool result = true;
+	if (product.control.has_value()) {
+		auto sendResult = send(product.control.value());
+		if(!sendResult) {
+			LOG_DEBUG << "Failed to send control message";
+		}
+		result = result && sendResult;
+	}
+	if (product.messages.has_value()) {
+		auto messages = product.messages.value();
+		for (unsigned i = 0; i < messages->size(); i++) {
+			auto message = messages->at(i);
+			if (!message) {
+				LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << messages->size();
+			}
+			auto sendResult = send(make_message(*message));
+			if(!sendResult) {
+				LOG_DEBUG << "Failed to send message " << i + 1 << "/" << messages->size();
+			}
+			result = result && sendResult;
+		}
+	}
+	return result;
+}
+
+std::optional<message_ptr> RtcpChainableHandler::handleIncomingBinary(message_ptr msg) {
+	assert(msg->type == Message::Binary);
+	auto messages = root->split(msg);
+	auto incoming = leaf->processIncomingBinary(messages, [this](ChainedOutgoingResponseProduct outgoing) {
+		return sendProduct(outgoing);
+	});
+	if (incoming.has_value()) {
+		return root->reduce(incoming.value());
+	} else {
+		return nullopt;
+	}
+}
+
+std::optional<message_ptr> RtcpChainableHandler::handleIncomingControl(message_ptr msg) {
+	assert(msg->type == Message::Control);
+	auto incoming = leaf->processIncomingControl(msg, [this](ChainedOutgoingResponseProduct outgoing) {
+		return sendProduct(outgoing);
+	});
+	assert(!incoming.has_value() || incoming.value()->type == Message::Control);
+	return incoming;
+}
+
+std::optional<message_ptr> RtcpChainableHandler::handleOutgoingBinary(message_ptr msg) {
+	assert(msg->type == Message::Binary);
+	auto messages = make_chained_messages_product(msg);
+	auto optOutgoing = root->processOutgoingBinary(ChainedOutgoingProduct(messages));
+	if (!optOutgoing.has_value()) {
+		LOG_ERROR << "Generating outgoing message failed";
+		return nullopt;
+	}
+	auto outgoing = optOutgoing.value();
+	if (outgoing.control.has_value()) {
+		if(!send(outgoing.control.value())) {
+			LOG_DEBUG << "Failed to send control message";
+		}
+	}
+	auto lastMessage = outgoing.messages->back();
+	if (!lastMessage) {
+		LOG_DEBUG << "Invalid message to send";
+		return nullopt;
+	}
+	for (unsigned i = 0; i < outgoing.messages->size() - 1; i++) {
+		auto message = outgoing.messages->at(i);
+		if (!message) {
+			LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << outgoing.messages->size();
+		}
+		if(!send(make_message(*message))) {
+			LOG_DEBUG << "Failed to send message " << i + 1 << "/" << outgoing.messages->size();
+		}
+	}
+	return make_message(*lastMessage);
+}
+
+std::optional<message_ptr> RtcpChainableHandler::handleOutgoingControl(message_ptr msg) {
+	assert(msg->type == Message::Control);
+	auto optOutgoing = root->processOutgoingControl(msg);
+	assert(!optOutgoing.has_value() || optOutgoing.value()->type == Message::Control);
+	if (!optOutgoing.has_value()) {
+		LOG_ERROR << "Generating outgoing control message failed";
+		return nullopt;
+	}
+	return optOutgoing.value();
+}
+
+message_ptr RtcpChainableHandler::outgoing(message_ptr ptr) {
+	assert(ptr);
+	if (!ptr) {
+		LOG_ERROR << "Outgoing message is nullptr, ignoring";
+		return nullptr;
+	}
+	std::lock_guard<std::mutex> guard(inoutMutex);
+	if (ptr->type == Message::Binary) {
+		return handleOutgoingBinary(ptr).value_or(nullptr);
+	} else if (ptr->type == Message::Control) {
+		return handleOutgoingControl(ptr).value_or(nullptr);
+	}
+	return ptr;
+}
+
+message_ptr RtcpChainableHandler::incoming(message_ptr ptr) {
+	if (!ptr) {
+		LOG_ERROR << "Incoming message is nullptr, ignoring";
+		return nullptr;
+	}
+	std::lock_guard<std::mutex> guard(inoutMutex);
+	if (ptr->type == Message::Binary) {
+		return handleIncomingBinary(ptr).value_or(nullptr);
+	} else if (ptr->type == Message::Control) {
+		return handleIncomingControl(ptr).value_or(nullptr);
+	}
+	return ptr;
+}
+
+bool RtcpChainableHandler::send(message_ptr msg) {
+	try {
+		outgoingCallback(std::move(msg));
+		return true;
+	} catch (const std::exception &e) {
+		LOG_DEBUG << "Send in RTCP chain handler failed: " << e.what();
+	}
+	return false;
+}
+
+void RtcpChainableHandler::addToChain(std::shared_ptr<MessageHandlerElement> chainable) {
+	assert(leaf);
+	leaf = leaf->chainWith(chainable);
+}
+
+} // namespace rtc
+
+#endif /* RTC_ENABLE_MEDIA */

+ 32 - 17
src/rtcpsenderreporter.cpp → src/rtcpsrreporter.cpp

@@ -1,5 +1,4 @@
-/*
- * libdatachannel streamer example
+/**
  * Copyright (c) 2020 Filip Klembara (in2core)
  *
  * This program is free software; you can redistribute it and/or
@@ -18,40 +17,53 @@
 
 #if RTC_ENABLE_MEDIA
 
-#include "rtcpsenderreporter.hpp"
+#include "rtcpsrreporter.hpp"
 
 namespace rtc {
 
-void RtcpSenderReporter::startRecording() {
-	_previousReportedTimestamp = rtpConfig->timestamp;
-	timeOffset = rtpConfig->startTime_s - rtpConfig->timestampToSeconds(rtpConfig->timestamp);
+ChainedOutgoingProduct RtcpSRReporter::modifyOutgoingBinary(ChainedMessagesProduct messages, std::optional<message_ptr> control) {
+	if (needsToReport) {
+		auto timestamp = rtpConfig->timestamp;
+		auto sr = getSenderReport(timestamp);
+		if (control.has_value()) {
+			auto rtcp = control.value();
+			rtcp->insert(rtcp->end(), sr->begin(), sr->end());
+		} else {
+			control = sr;
+		}
+		needsToReport = false;
+	}
+	for (auto message: *messages) {
+		auto rtp = reinterpret_cast<RTP *>(message->data());
+		addToReport(rtp, message->size());
+	}
+	return {messages, control};
 }
 
-void RtcpSenderReporter::sendReport(uint32_t timestamp) {
-	auto sr = getSenderReport(timestamp);
-	_previousReportedTimestamp = timestamp;
-	senderReportOutgoingCallback(move(sr));
+void RtcpSRReporter::startRecording() {
+	_previousReportedTimestamp = rtpConfig->timestamp;
+	timeOffset = rtpConfig->startTime_s - rtpConfig->timestampToSeconds(rtpConfig->timestamp);
 }
 
-void RtcpSenderReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
+void RtcpSRReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
 	packetCount += 1;
 	assert(!rtp->padding());
 	payloadOctets += rtpSize - rtp->getSize();
 }
 
-RtcpSenderReporter::RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
-    : rtpConfig(rtpConfig) {}
+RtcpSRReporter::RtcpSRReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
+: MessageHandlerElement(), rtpConfig(rtpConfig) {}
 
-uint64_t RtcpSenderReporter::secondsToNTP(double seconds) {
+uint64_t RtcpSRReporter::secondsToNTP(double seconds) {
 	return std::round(seconds * double(uint64_t(1) << 32));
 }
 
-void RtcpSenderReporter::setNeedsToReport() { needsToReport = true; }
+void RtcpSRReporter::setNeedsToReport() { needsToReport = true; }
 
-message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
+message_ptr RtcpSRReporter::getSenderReport(uint32_t timestamp) {
 	auto srSize = RTCP_SR::size(0);
 	auto msg = make_message(srSize + RTCP_SDES::size({{uint8_t(rtpConfig->cname.size())}}),
-	                        Message::Type::Control);
+							Message::Type::Control);
 	auto sr = reinterpret_cast<RTCP_SR *>(msg->data());
 	auto timestamp_s = rtpConfig->timestampToSeconds(timestamp);
 	auto currentTime = timeOffset + timestamp_s;
@@ -68,6 +80,9 @@ message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
 	item->type = 1;
 	item->setText(rtpConfig->cname);
 	sdes->preparePacket(1);
+
+	_previousReportedTimestamp = timestamp;
+
 	return msg;
 }
 

+ 3 - 3
src/rtppacketizer.cpp

@@ -25,8 +25,8 @@ namespace rtc {
 RtpPacketizer::RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
     : rtpConfig(rtpConfig) {}
 
-message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
-	auto msg = make_message(rtpHeaderSize + payload.size());
+binary_ptr RtpPacketizer::packetize(std::shared_ptr<binary> payload, bool setMark) {
+	auto msg = std::make_shared<binary>(rtpHeaderSize + payload->size());
 	auto *rtp = (RTP *)msg->data();
 	rtp->setPayloadType(rtpConfig->payloadType);
 	// increase sequence number
@@ -37,7 +37,7 @@ message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
 		rtp->setMarker(true);
 	}
 	rtp->preparePacket();
-	copy(payload.begin(), payload.end(), msg->begin() + rtpHeaderSize);
+	memcpy(msg->data() + rtpHeaderSize, payload->data(), payload->size());
 	return msg;
 }