Browse Source

Merge pull request #678 from SE2Dev/interceptor

Add Media Interceptor API
Paul-Louis Ageneau 2 years ago
parent
commit
4cf9869aca

+ 8 - 0
include/rtc/message.hpp

@@ -60,6 +60,14 @@ RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Mess
 
 RTC_CPP_EXPORT message_ptr make_message(message_variant data);
 
+#if RTC_ENABLE_MEDIA
+
+// Reconstructs a message_ptr from an opaque rtcMessage pointer that
+// was allocated by rtcCreateOpaqueMessage().
+message_ptr make_message_from_opaque_ptr(rtcMessage *&&message);
+
+#endif
+
 RTC_CPP_EXPORT message_variant to_variant(Message &&message);
 RTC_CPP_EXPORT message_variant to_variant(const Message &message);
 

+ 3 - 0
include/rtc/peerconnection.hpp

@@ -82,6 +82,9 @@ public:
 	void setRemoteDescription(Description description);
 	void addRemoteCandidate(Candidate candidate);
 
+	void setMediaHandler(shared_ptr<MediaHandler> handler);
+	shared_ptr<MediaHandler> getMediaHandler();
+
 	[[nodiscard]] shared_ptr<DataChannel> createDataChannel(string label, DataChannelInit init = {});
 	void onDataChannel(std::function<void(std::shared_ptr<DataChannel> dataChannel)> callback);
 

+ 16 - 0
include/rtc/rtc.h

@@ -139,6 +139,8 @@ typedef void(RTC_API *rtcOpenCallbackFunc)(int id, void *ptr);
 typedef void(RTC_API *rtcClosedCallbackFunc)(int id, void *ptr);
 typedef void(RTC_API *rtcErrorCallbackFunc)(int id, const char *error, void *ptr);
 typedef void(RTC_API *rtcMessageCallbackFunc)(int id, const char *message, int size, void *ptr);
+typedef void *(RTC_API *rtcInterceptorCallbackFunc)(int pc, const char *message, int size,
+                                                    void *ptr);
 typedef void(RTC_API *rtcBufferedAmountLowCallbackFunc)(int id, void *ptr);
 typedef void(RTC_API *rtcAvailableCallbackFunc)(int id, void *ptr);
 
@@ -303,6 +305,20 @@ typedef struct {
 	const char *trackId; // optional, track ID used in MSID
 } rtcSsrcForTypeInit;
 
+// Opaque message
+
+// Opaque type used (via rtcMessage*) to reference an rtc::Message
+typedef void* rtcMessage;
+
+// Allocate a new opaque message.
+// Must be explicitly freed by rtcDeleteOpaqueMessage() unless
+// explicitly returned by a media interceptor callback;
+RTC_EXPORT rtcMessage *rtcCreateOpaqueMessage(void *data, int size);
+RTC_EXPORT void rtcDeleteOpaqueMessage(rtcMessage *msg);
+
+// Set MediaInterceptor for peer connection
+RTC_EXPORT int rtcSetMediaInterceptorCallback(int id, rtcInterceptorCallbackFunc cb);
+
 // Set H264PacketizationHandler for track
 RTC_C_EXPORT int rtcSetH264PacketizationHandler(int tr, const rtcPacketizationHandlerInit *init);
 

+ 70 - 0
src/capi.cpp

@@ -297,6 +297,43 @@ createRtpPacketizationConfig(const rtcPacketizationHandlerInit *init) {
 	return config;
 }
 
+class MediaInterceptor final : public MediaHandler {
+public:
+	using MessageCallback = std::function<void *(void *data, int size)>;
+
+	MediaInterceptor(MessageCallback cb) : incomingCallback(cb) {}
+
+	// Called when there is traffic coming from the peer
+	message_ptr incoming(message_ptr msg) override {
+		// If no callback is provided, just forward the message on
+		if (!incomingCallback) {
+			return msg;
+		}
+
+		auto res = incomingCallback(reinterpret_cast<void *>(msg->data()), msg->size());
+
+		// If a null pointer was returned, drop the incoming message
+		if (res == nullptr) {
+			return nullptr;
+		}
+
+		// If the original data pointer was returned, forward the incoming message
+		if (res == msg->data()) {
+			return msg;
+		}
+
+		// Construct a true message_ptr from the returned opaque pointer
+		return make_message_from_opaque_ptr(std::move(reinterpret_cast<rtcMessage *>(res)));
+	};
+
+	// Called when there is traffic that needs to be sent to the peer
+	// This is a no-op for media interceptors
+	message_ptr outgoing(message_ptr ptr) override { return ptr; };
+
+private:
+	MessageCallback incomingCallback;
+};
+
 #endif // RTC_ENABLE_MEDIA
 
 #if RTC_ENABLE_WEBSOCKET
@@ -1105,6 +1142,39 @@ void setSSRC(Description::Media *description, uint32_t ssrc, const char *_name,
 	description->addSSRC(ssrc, name, msid, trackID);
 }
 
+rtcMessage *rtcCreateOpaqueMessage(void *data, int size) {
+	auto src = reinterpret_cast<std::byte *>(data);
+	auto msg = new Message(src, src + size);
+	// Downgrade the message pointer to the opaque rtcMessage* type
+	return reinterpret_cast<rtcMessage *>(msg);
+}
+
+void rtcDeleteOpaqueMessage(rtcMessage *msg) {
+	// Cast the opaque pointer back to it's true type before deleting
+	delete reinterpret_cast<Message *>(msg);
+}
+
+int rtcSetMediaInterceptorCallback(int pc, rtcInterceptorCallbackFunc cb) {
+	return wrap([&] {
+		auto peerConnection = getPeerConnection(pc);
+
+		if (cb == nullptr) {
+			peerConnection->setMediaHandler(nullptr);
+			return RTC_ERR_SUCCESS;
+		}
+
+		auto interceptor = std::make_shared<MediaInterceptor>([pc, cb](void *data, int size) {
+			if (auto ptr = getUserPointer(pc))
+				return cb(pc, reinterpret_cast<const char *>(data), size, *ptr);
+			return data;
+		});
+
+		peerConnection->setMediaHandler(interceptor);
+
+		return RTC_ERR_SUCCESS;
+	});
+}
+
 int rtcSetH264PacketizationHandler(int tr, const rtcPacketizationHandlerInit *init) {
 	return wrap([&] {
 		auto track = getTrack(tr);

+ 22 - 1
src/impl/peerconnection.cpp

@@ -349,7 +349,8 @@ void PeerConnection::closeTransports() {
 	if (!changeState(State::Closed))
 		return; // already closed
 
-	// Reset callbacks now that state is changed
+	// Reset intercceptor and callbacks now that state is changed
+	setMediaHandler(nullptr);
 	resetCallbacks();
 
 	// Pass the pointers to a thread, allowing to terminate a transport from its own thread
@@ -475,6 +476,14 @@ void PeerConnection::forwardMedia(message_ptr message) {
 	if (!message)
 		return;
 
+	auto handler = getMediaHandler();
+
+	if (handler) {
+		message = handler->incoming(message);
+		if (!message)
+			return;
+	}
+
 	// Browsers like to compound their packets with a random SSRC.
 	// we have to do this monstrosity to distribute the report blocks
 	if (message->type == Message::Control) {
@@ -1069,6 +1078,18 @@ string PeerConnection::localBundleMid() const {
 	return mLocalDescription ? mLocalDescription->bundleMid() : "0";
 }
 
+void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
+	std::unique_lock lock(mMediaHandlerMutex);
+	if (mMediaHandler)
+		mMediaHandler->onOutgoing(nullptr);
+	mMediaHandler = handler;
+}
+
+shared_ptr<MediaHandler> PeerConnection::getMediaHandler() {
+	std::shared_lock lock(mMediaHandlerMutex);
+	return mMediaHandler;
+}
+
 void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	auto dataChannel = weakDataChannel.lock();
 	if (dataChannel) {

+ 7 - 0
src/impl/peerconnection.hpp

@@ -79,6 +79,9 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 	void processRemoteCandidate(Candidate candidate);
 	string localBundleMid() const;
 
+	void setMediaHandler(shared_ptr<MediaHandler> handler);
+	shared_ptr<MediaHandler> getMediaHandler();
+
 	void triggerDataChannel(weak_ptr<DataChannel> weakDataChannel);
 	void triggerTrack(weak_ptr<Track> weakTrack);
 
@@ -126,6 +129,10 @@ private:
 	optional<Description> mCurrentLocalDescription;
 	mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
 
+	shared_ptr<MediaHandler> mMediaHandler;
+
+	mutable std::shared_mutex mMediaHandlerMutex;
+
 	shared_ptr<IceTransport> mIceTransport;
 	shared_ptr<DtlsTransport> mDtlsTransport;
 	shared_ptr<SctpTransport> mSctpTransport;

+ 9 - 0
src/message.cpp

@@ -38,6 +38,15 @@ message_ptr make_message(message_variant data) {
 	    std::move(data));
 }
 
+#if RTC_ENABLE_MEDIA
+
+message_ptr make_message_from_opaque_ptr(rtcMessage *&&message) {
+	auto ptr = std::unique_ptr<Message>(reinterpret_cast<Message *>(message));
+	return message_ptr(std::move(ptr));
+}
+
+#endif
+
 message_variant to_variant(Message &&message) {
 	switch (message.type) {
 	case Message::String:

+ 6 - 0
src/peerconnection.cpp

@@ -239,6 +239,12 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
 	impl()->processRemoteCandidate(std::move(candidate));
 }
 
+void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
+	impl()->setMediaHandler(std::move(handler));
+};
+
+shared_ptr<MediaHandler> PeerConnection::getMediaHandler() { return impl()->getMediaHandler(); };
+
 optional<string> PeerConnection::localAddress() const {
 	auto iceTransport = impl()->getIceTransport();
 	return iceTransport ? iceTransport->getLocalAddress() : nullopt;