Browse Source

Refactored the RTCP dispatching monstruosity

Paul-Louis Ageneau 4 years ago
parent
commit
519e81727a
2 changed files with 74 additions and 162 deletions
  1. 2 4
      include/rtc/peerconnection.hpp
  2. 72 158
      src/peerconnection.cpp

+ 2 - 4
include/rtc/peerconnection.hpp

@@ -139,8 +139,7 @@ private:
 	void forwardMessage(message_ptr message);
 	void forwardMessage(message_ptr message);
 	void forwardMedia(message_ptr message);
 	void forwardMedia(message_ptr message);
 	void forwardBufferedAmount(uint16_t stream, size_t amount);
 	void forwardBufferedAmount(uint16_t stream, size_t amount);
-	std::optional<std::string> getMidFromSSRC(SSRC ssrc);
-	std::optional<uint32_t> getMLineFromSSRC(SSRC ssrc);
+	std::optional<std::string> getMidFromSsrc(uint32_t ssrc);
 
 
 	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, string label,
 	std::shared_ptr<DataChannel> emplaceDataChannel(Description::Role role, string label,
 	                                                DataChannelInit init);
 	                                                DataChannelInit init);
@@ -186,8 +185,7 @@ private:
 	std::vector<std::weak_ptr<Track>> mTrackLines;                          // by SDP order
 	std::vector<std::weak_ptr<Track>> mTrackLines;                          // by SDP order
 	std::shared_mutex mDataChannelsMutex, mTracksMutex;
 	std::shared_mutex mDataChannelsMutex, mTracksMutex;
 
 
-	std::unordered_map<uint32_t, string> mMidFromSssrc;         // cache
-	std::unordered_map<uint32_t, unsigned int> mMLineFromSssrc; // cache
+	std::unordered_map<uint32_t, string> mMidFromSsrc; // cache
 
 
 	std::atomic<State> mState;
 	std::atomic<State> mState;
 	std::atomic<GatheringState> mGatheringState;
 	std::atomic<GatheringState> mGatheringState;

+ 72 - 158
src/peerconnection.cpp

@@ -32,16 +32,15 @@
 #endif
 #endif
 
 
 #include <iomanip>
 #include <iomanip>
+#include <set>
 #include <thread>
 #include <thread>
 
 
 #if __clang__
 #if __clang__
 namespace {
 namespace {
-
 template <typename To, typename From>
 template <typename To, typename From>
 inline std::shared_ptr<To> reinterpret_pointer_cast(std::shared_ptr<From> const &ptr) noexcept {
 inline std::shared_ptr<To> reinterpret_pointer_cast(std::shared_ptr<From> const &ptr) noexcept {
 	return std::shared_ptr<To>(ptr, reinterpret_cast<To *>(ptr.get()));
 	return std::shared_ptr<To>(ptr, reinterpret_cast<To *>(ptr.get()));
 }
 }
-
 } // namespace
 } // namespace
 #else
 #else
 using std::reinterpret_pointer_cast;
 using std::reinterpret_pointer_cast;
@@ -680,75 +679,26 @@ void PeerConnection::forwardMedia(message_ptr message) {
 
 
 	// Browsers like to compound their packets with a random SSRC.
 	// Browsers like to compound their packets with a random SSRC.
 	// we have to do this monstrosity to distribute the report blocks
 	// we have to do this monstrosity to distribute the report blocks
-	std::optional<unsigned int> mediaLine;
 	if (message->type == Message::Control) {
 	if (message->type == Message::Control) {
+		std::set<uint32_t> ssrcs;
 		size_t offset = 0;
 		size_t offset = 0;
-		std::vector<SSRC> ssrcsFound;
-		bool hasFound = false;
-
 		while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
 		while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
-			auto header = (rtc::RTCP_HEADER *)(message->data() + offset);
+			auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
 			if (header->lengthInBytes() > message->size() - offset) {
 			if (header->lengthInBytes() > message->size() - offset) {
-				PLOG_WARNING << "Packet was truncated";
+				PLOG_WARNING << "RTCP packet is truncated";
 				break;
 				break;
 			}
 			}
 			offset += header->lengthInBytes();
 			offset += header->lengthInBytes();
 			if (header->payloadType() == 205 || header->payloadType() == 206) {
 			if (header->payloadType() == 205 || header->payloadType() == 206) {
-				auto rtcpfb = (RTCP_FB_HEADER *)header;
-				auto ssrc = rtcpfb->getPacketSenderSSRC();
-				if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
-					mediaLine = getMLineFromSSRC(ssrc);
-					if (mediaLine.has_value()) {
-						hasFound = true;
-						std::shared_lock lock(mTracksMutex); // read-only
-						if (auto track = mTrackLines[*mediaLine].lock()) {
-							track->incoming(message);
-						}
-						ssrcsFound.emplace_back(ssrc);
-					}
-				}
+				auto rtcpfb = reinterpret_cast<RTCP_FB_HEADER *>(header);
+				ssrcs.insert(rtcpfb->getPacketSenderSSRC());
+				ssrcs.insert(rtcpfb->getMediaSourceSSRC());
 
 
-				ssrc = rtcpfb->getMediaSourceSSRC();
-				if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
-					mediaLine = getMLineFromSSRC(ssrc);
-					if (mediaLine.has_value()) {
-						hasFound = true;
-						std::shared_lock lock(mTracksMutex); // read-only
-						if (auto track = mTrackLines[*mediaLine].lock()) {
-							track->incoming(message);
-						}
-						ssrcsFound.emplace_back(ssrc);
-					}
-				}
 			} else if (header->payloadType() == 200 || header->payloadType() == 201) {
 			} else if (header->payloadType() == 200 || header->payloadType() == 201) {
-				auto rtcpsr = (RTCP_SR *)header;
-				auto ssrc = rtcpsr->senderSSRC();
-				if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
-					mediaLine = getMLineFromSSRC(ssrc);
-					if (mediaLine.has_value()) {
-						hasFound = true;
-						std::shared_lock lock(mTracksMutex); // read-only
-						if (auto track = mTrackLines[*mediaLine].lock()) {
-							track->incoming(message);
-						}
-						ssrcsFound.emplace_back(ssrc);
-					}
-				}
-				for (int i = 0; i < rtcpsr->header.reportCount(); i++) {
-					auto block = rtcpsr->getReportBlock(i);
-					ssrc = block->getSSRC();
-					if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
-						mediaLine = getMLineFromSSRC(ssrc);
-						if (mediaLine.has_value()) {
-							hasFound = true;
-							std::shared_lock lock(mTracksMutex); // read-only
-							if (auto track = mTrackLines[*mediaLine].lock()) {
-								track->incoming(message);
-							}
-							ssrcsFound.emplace_back(ssrc);
-						}
-					}
-				}
+				auto rtcpsr = reinterpret_cast<RTCP_SR *>(header);
+				ssrcs.insert(rtcpsr->senderSSRC());
+				for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
+					ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
 			} else {
 			} else {
 				// PT=202 == SDES
 				// PT=202 == SDES
 				// PT=207 == Extended Report
 				// PT=207 == Extended Report
@@ -759,119 +709,84 @@ void PeerConnection::forwardMedia(message_ptr message) {
 			}
 			}
 		}
 		}
 
 
-		if (hasFound)
+		if (!ssrcs.empty()) {
+			for (uint32_t ssrc : ssrcs) {
+				if (auto mid = getMidFromSsrc(ssrc)) {
+					std::shared_lock lock(mTracksMutex); // read-only
+					if (auto it = mTracks.find(*mid); it != mTracks.end())
+						if (auto track = it->second.lock())
+							track->incoming(message);
+				}
+			}
 			return;
 			return;
+		}
 	}
 	}
 
 
-	unsigned int ssrc = message->stream;
-	mediaLine = getMLineFromSSRC(ssrc);
-
-	if (!mediaLine) {
-		/* TODO
-		 *   So the problem is that when stop sending streams, we stop getting report blocks for
+	uint32_t ssrc = uint32_t(message->stream);
+	if (auto mid = getMidFromSsrc(ssrc)) {
+		std::shared_lock lock(mTracksMutex); // read-only
+		if (auto it = mTracks.find(*mid); it != mTracks.end())
+			if (auto track = it->second.lock())
+				track->incoming(message);
+	} else {
+		/*
+		 * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
 		 * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
 		 * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
 		 * forward them. Therefore, it is expected that we don't know where to forward packets. Is
 		 * forward them. Therefore, it is expected that we don't know where to forward packets. Is
 		 * this ideal? No! Do I know how to fix it? No!
 		 * this ideal? No! Do I know how to fix it? No!
 		 */
 		 */
-		//	PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
+		// PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
 		return;
 		return;
 	}
 	}
+} // namespace rtc
 
 
-	std::shared_lock lock(mTracksMutex); // read-only
-	if (auto track = mTrackLines[*mediaLine].lock()) {
-		track->incoming(message);
-	}
-}
-
-std::optional<unsigned int> PeerConnection::getMLineFromSSRC(SSRC ssrc) {
-	if (auto it = mMLineFromSssrc.find(ssrc); it != mMLineFromSssrc.end()) {
+std::optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
+	if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
 		return it->second;
 		return it->second;
-	} else {
-		{
-			std::lock_guard lock(mRemoteDescriptionMutex);
-			if (!mRemoteDescription)
-				return nullopt;
-			for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
-				if (std::visit(
-				        rtc::overloaded{[&](Description::Application *) -> bool { return false; },
-				                        [&](Description::Media *media) -> bool {
-					                        return media->hasSSRC(ssrc);
-				                        }},
-				        mRemoteDescription->media(i))) {
-
-					mMLineFromSssrc.emplace(ssrc, i);
-					return i;
-				}
-			}
-		}
-		{
-			std::lock_guard lock(mLocalDescriptionMutex);
-			if (!mLocalDescription)
-				return nullopt;
-			for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
-				if (std::visit(
-				        rtc::overloaded{[&](Description::Application *) -> bool { return false; },
-				                        [&](Description::Media *media) -> bool {
-					                        return media->hasSSRC(ssrc);
-				                        }},
-				        mLocalDescription->media(i))) {
-
-					mMLineFromSssrc.emplace(ssrc, i);
-					return i;
-				}
-			}
-		}
-	}
-	return std::nullopt;
-}
 
 
-std::optional<std::string> PeerConnection::getMidFromSSRC(SSRC ssrc) {
-	if (auto it = mMidFromSssrc.find(ssrc); it != mMidFromSssrc.end()) {
-		return it->second;
-	} else {
-		{
-			std::lock_guard lock(mRemoteDescriptionMutex);
-			if (!mRemoteDescription)
-				return nullopt;
-			for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
-				if (auto found = std::visit(
-				        rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
-					                        return std::nullopt;
-				                        },
-				                        [&](Description::Media *media) -> std::optional<string> {
-					                        return media->hasSSRC(ssrc)
-					                                   ? std::make_optional(media->mid())
-					                                   : nullopt;
-				                        }},
-				        mRemoteDescription->media(i))) {
-
-					mMidFromSssrc.emplace(ssrc, *found);
-					return *found;
-				}
+	{
+		std::lock_guard lock(mRemoteDescriptionMutex);
+		if (!mRemoteDescription)
+			return nullopt;
+		for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
+			if (auto found = std::visit(
+			        rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
+				                        return std::nullopt;
+			                        },
+			                        [&](Description::Media *media) -> std::optional<string> {
+				                        return media->hasSSRC(ssrc)
+				                                   ? std::make_optional(media->mid())
+				                                   : nullopt;
+			                        }},
+			        mRemoteDescription->media(i))) {
+
+				mMidFromSsrc.emplace(ssrc, *found);
+				return *found;
 			}
 			}
 		}
 		}
-		{
-			std::lock_guard lock(mLocalDescriptionMutex);
-			if (!mLocalDescription)
-				return nullopt;
-			for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
-				if (auto found = std::visit(
-				        rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
-					                        return std::nullopt;
-				                        },
-				                        [&](Description::Media *media) -> std::optional<string> {
-					                        return media->hasSSRC(ssrc)
-					                                   ? std::make_optional(media->mid())
-					                                   : nullopt;
-				                        }},
-				        mLocalDescription->media(i))) {
-
-					mMidFromSssrc.emplace(ssrc, *found);
-					return *found;
-				}
+	}
+	{
+		std::lock_guard lock(mLocalDescriptionMutex);
+		if (!mLocalDescription)
+			return nullopt;
+		for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
+			if (auto found = std::visit(
+			        rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
+				                        return std::nullopt;
+			                        },
+			                        [&](Description::Media *media) -> std::optional<string> {
+				                        return media->hasSSRC(ssrc)
+				                                   ? std::make_optional(media->mid())
+				                                   : nullopt;
+			                        }},
+			        mLocalDescription->media(i))) {
+
+				mMidFromSsrc.emplace(ssrc, *found);
+				return *found;
 			}
 			}
 		}
 		}
 	}
 	}
+
 	return nullopt;
 	return nullopt;
 }
 }
 
 
@@ -1121,7 +1036,6 @@ void PeerConnection::processLocalDescription(Description description) {
 		}
 		}
 
 
 		// Add media for local tracks
 		// Add media for local tracks
-
 		std::shared_lock lock(mTracksMutex);
 		std::shared_lock lock(mTracksMutex);
 		for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
 		for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
 			if (auto track = it->lock()) {
 			if (auto track = it->lock()) {