Browse Source

Merge pull request #528 from paullouisageneau/fix-cleanup

Blocking cleanup
Paul-Louis Ageneau 3 years ago
parent
commit
881c6b69fa
10 changed files with 137 additions and 37 deletions
  1. 3 1
      DOC.md
  2. 2 1
      include/rtc/global.hpp
  3. 4 1
      pages/content/pages/reference.md
  4. 43 2
      src/capi.cpp
  5. 1 1
      src/global.cpp
  6. 27 6
      src/impl/init.cpp
  7. 3 1
      src/impl/init.hpp
  8. 1 2
      test/benchmark.cpp
  9. 9 0
      test/capi_websocketserver.cpp
  10. 44 22
      test/main.cpp

+ 3 - 1
DOC.md

@@ -50,7 +50,9 @@ An optional call to `rtcPreload` preloads the global resources used by the libra
 void rtcCleanup(void)
 ```
 
-An optional call to `rtcCleanup` requests unloading of the global resources used by the library. If all created PeerConnections are deleted, unloading will happen immediately and the call will block until unloading is done, otherwise unloading will happen as soon as the last PeerConnection is deleted. If resources are already unloaded, the call has no effect.
+An optional call to `rtcCleanup` unloads the global resources used by the library. The call will block until unloading is done. If Peer Connections, Data Channels, Tracks, or WebSockets created through this API still exist, they will be destroyed. If resources are already unloaded, the call has no effect.
+
+Warning: This function requires all Peer Connections, Data Channels, Tracks, and WebSockets to be destroyed before returning, meaning all callbacks must return before this function returns. Therefore, it must never be called from a callback.
 
 #### rtcSetUserPointer
 

+ 2 - 1
include/rtc/global.hpp

@@ -23,6 +23,7 @@
 
 #include <chrono>
 #include <iostream>
+#include <future>
 
 namespace rtc {
 
@@ -47,7 +48,7 @@ RTC_CPP_EXPORT void InitLogger(plog::Severity severity, plog::IAppender *appende
 #endif
 
 RTC_CPP_EXPORT void Preload();
-RTC_CPP_EXPORT void Cleanup();
+RTC_CPP_EXPORT std::shared_future<void> Cleanup();
 
 struct SctpSettings {
 	// For the following settings, not set means optimized default

+ 4 - 1
pages/content/pages/reference.md

@@ -53,7 +53,10 @@ An optional call to `rtcPreload` preloads the global resources used by the libra
 void rtcCleanup(void)
 ```
 
-An optional call to `rtcCleanup` requests unloading of the global resources used by the library. If all created PeerConnections are deleted, unloading will happen immediately and the call will block until unloading is done, otherwise unloading will happen as soon as the last PeerConnection is deleted. If resources are already unloaded, the call has no effect.
+An optional call to `rtcCleanup` unloads the global resources used by the library. The call will block until unloading is done. If Peer Connections, Data Channels, Tracks, or WebSockets created through this API still exist, they will be destroyed. If resources are already unloaded, the call has no effect.
+
+Warning: This function requires all Peer Connections, Data Channels, Tracks, and WebSockets to be destroyed before returning, meaning all callbacks must return before this function returns. Therefore, it must never be called from a callback.
+
 
 #### rtcSetUserPointer
 

+ 43 - 2
src/capi.cpp

@@ -29,6 +29,7 @@
 #include <utility>
 
 using namespace rtc;
+using namespace std::chrono_literals;
 using std::chrono::milliseconds;
 
 namespace {
@@ -134,6 +135,27 @@ void eraseTrack(int tr) {
 	userPointerMap.erase(tr);
 }
 
+size_t eraseAll() {
+	std::lock_guard lock(mutex);
+	size_t count = dataChannelMap.size() + trackMap.size() + peerConnectionMap.size();
+	dataChannelMap.clear();
+	trackMap.clear();
+	peerConnectionMap.clear();
+#if RTC_ENABLE_MEDIA
+	count += rtcpChainableHandlerMap.size() + rtcpSrReporterMap.size() + rtpConfigMap.size();
+	rtcpChainableHandlerMap.clear();
+	rtcpSrReporterMap.clear();
+	rtpConfigMap.clear();
+#endif
+#if RTC_ENABLE_WEBSOCKET
+	count += webSocketMap.size() + webSocketServerMap.size();
+	webSocketMap.clear();
+	webSocketServerMap.clear();
+#endif
+	userPointerMap.clear();
+	return count;
+}
+
 shared_ptr<Channel> getChannel(int id) {
 	std::lock_guard lock(mutex);
 	if (auto it = dataChannelMap.find(id); it != dataChannelMap.end())
@@ -1340,9 +1362,28 @@ RTC_EXPORT int rtcGetWebSocketServerPort(int wsserver) {
 
 #endif
 
-void rtcPreload() { rtc::Preload(); }
+void rtcPreload() {
+	try {
+		rtc::Preload();
+	} catch (const std::exception &e) {
+		PLOG_ERROR << e.what();
+	}
+}
+
+void rtcCleanup() {
+	try {
+		size_t count = eraseAll();
+		if(count != 0) {
+			PLOG_INFO << count << " objects were not properly destroyed before cleanup";
+		}
+
+		if(rtc::Cleanup().wait_for(10s) == std::future_status::timeout)
+			throw std::runtime_error("Cleanup timeout (possible deadlock or undestructible object)");
 
-void rtcCleanup() { rtc::Cleanup(); }
+	} catch (const std::exception &e) {
+		PLOG_ERROR << e.what();
+	}
+}
 
 int rtcSetSctpSettings(const rtcSctpSettings *settings) {
 	return wrap([&] {

+ 1 - 1
src/global.cpp

@@ -103,7 +103,7 @@ void InitLogger(plog::Severity severity, plog::IAppender *appender) {
 }
 
 void Preload() { Init::Instance().preload(); }
-void Cleanup() { Init::Instance().cleanup(); }
+std::shared_future<void> Cleanup() { return Init::Instance().cleanup(); }
 
 void SetSctpSettings(SctpSettings s) { Init::Instance().setSctpSettings(std::move(s)); }
 

+ 27 - 6
src/impl/init.cpp

@@ -40,12 +40,28 @@
 namespace rtc {
 
 struct Init::TokenPayload {
-	TokenPayload() { Init::Instance().doInit(); }
+	TokenPayload(std::shared_future<void> *cleanupFuture) {
+		Init::Instance().doInit();
+		if(cleanupFuture)
+			*cleanupFuture = cleanupPromise.get_future().share();
+	}
 
 	~TokenPayload() {
-		std::thread t([]() { Init::Instance().doCleanup(); });
+		std::thread t(
+		    [](std::promise<void> promise) {
+			    try {
+				    Init::Instance().doCleanup();
+				    promise.set_value();
+			    } catch (const std::exception &e) {
+				    PLOG_WARNING << e.what();
+				    promise.set_exception(std::make_exception_ptr(e));
+			    }
+		    },
+		    std::move(cleanupPromise));
 		t.detach();
 	}
+
+	std::promise<void> cleanupPromise;
 };
 
 Init &Init::Instance() {
@@ -53,7 +69,11 @@ Init &Init::Instance() {
 	return *instance;
 }
 
-Init::Init() {}
+Init::Init() {
+	std::promise<void> p;
+    p.set_value();
+    mCleanupFuture = p.get_future(); // make it ready
+}
 
 Init::~Init() {}
 
@@ -62,7 +82,7 @@ init_token Init::token() {
 	if (auto locked = mWeak.lock())
 		return locked;
 
-	mGlobal = std::make_shared<TokenPayload>();
+	mGlobal = std::make_shared<TokenPayload>(&mCleanupFuture);
 	mWeak = *mGlobal;
 	return *mGlobal;
 }
@@ -70,14 +90,15 @@ init_token Init::token() {
 void Init::preload() {
 	std::lock_guard lock(mMutex);
 	if (!mGlobal) {
-		mGlobal = std::make_shared<TokenPayload>();
+		mGlobal = std::make_shared<TokenPayload>(&mCleanupFuture);
 		mWeak = *mGlobal;
 	}
 }
 
-void Init::cleanup() {
+std::shared_future<void> Init::cleanup() {
 	std::lock_guard lock(mMutex);
 	mGlobal.reset();
+	return mCleanupFuture;
 }
 
 void Init::setSctpSettings(SctpSettings s) {

+ 3 - 1
src/impl/init.hpp

@@ -24,6 +24,7 @@
 
 #include <chrono>
 #include <mutex>
+#include <future>
 
 namespace rtc {
 
@@ -40,7 +41,7 @@ public:
 
 	init_token token();
 	void preload();
-	void cleanup();
+	std::shared_future<void> cleanup();
 	void setSctpSettings(SctpSettings s);
 
 private:
@@ -55,6 +56,7 @@ private:
 	bool mInitialized = false;
 	SctpSettings mCurrentSctpSettings = {};
 	std::mutex mMutex;
+	std::shared_future<void> mCleanupFuture;
 
 	struct TokenPayload;
 };

+ 1 - 2
test/benchmark.cpp

@@ -121,7 +121,7 @@ size_t benchmark(milliseconds duration) {
 			}
 		} catch (const std::exception &e) {
 			std::cout << "Send failed: " << e.what() << std::endl;
-		}		
+		}
 	});
 
 	// When sent data is buffered in the DataChannel,
@@ -168,7 +168,6 @@ size_t benchmark(milliseconds duration) {
 	pc2.close();
 
 	rtc::Cleanup();
-	this_thread::sleep_for(1s);
 	return goodput;
 }
 

+ 9 - 0
test/capi_websocketserver.cpp

@@ -35,6 +35,7 @@ static const char *MESSAGE = "Hello, this is a C API WebSocket test!";
 
 static bool success = false;
 static bool failed = false;
+static int wsclient = -1;
 
 static void RTC_API openCallback(int ws, void *ptr) {
 	printf("WebSocket: Connection open\n");
@@ -86,6 +87,8 @@ static void RTC_API serverMessageCallback(int ws, const char *message, int size,
 }
 
 static void RTC_API serverClientCallback(int wsserver, int ws, void *ptr) {
+	wsclient = ws;
+
 	char address[256];
 	if (rtcGetWebSocketRemoteAddress(ws, address, 256) < 0) {
 		fprintf(stderr, "rtcGetWebSocketRemoteAddress failed\n");
@@ -144,6 +147,9 @@ int test_capi_websocketserver_main() {
 	if (failed)
 		goto error;
 
+	rtcDeleteWebSocket(wsclient);
+	sleep(1);
+
 	rtcDeleteWebSocket(ws);
 	sleep(1);
 
@@ -154,6 +160,9 @@ int test_capi_websocketserver_main() {
 	return 0;
 
 error:
+	if (wsclient >= 0)
+		rtcDeleteWebSocket(wsclient);
+
 	if (ws >= 0)
 		rtcDeleteWebSocket(ws);
 

+ 44 - 22
test/main.cpp

@@ -20,6 +20,8 @@
 #include <iostream>
 #include <thread>
 
+#include <rtc/rtc.hpp>
+
 using namespace std;
 using namespace chrono_literals;
 
@@ -45,6 +47,7 @@ void test_benchmark() {
 }
 
 int main(int argc, char **argv) {
+	// C++ API tests
 	try {
 		cout << endl << "*** Running WebRTC connectivity test..." << endl;
 		test_connectivity();
@@ -53,7 +56,6 @@ int main(int argc, char **argv) {
 		cerr << "WebRTC connectivity test failed: " << e.what() << endl;
 		return -1;
 	}
-	this_thread::sleep_for(1s);
 	try {
 		cout << endl << "*** Running WebRTC TURN connectivity test..." << endl;
 		test_turn_connectivity();
@@ -62,17 +64,7 @@ int main(int argc, char **argv) {
 		cerr << "WebRTC TURN connectivity test failed: " << e.what() << endl;
 		return -1;
 	}
-	this_thread::sleep_for(1s);
-	try {
-		cout << endl << "*** Running WebRTC C API connectivity test..." << endl;
-		test_capi_connectivity();
-		cout << "*** Finished WebRTC C API connectivity test" << endl;
-	} catch (const exception &e) {
-		cerr << "WebRTC C API connectivity test failed: " << e.what() << endl;
-		return -1;
-	}
 #if RTC_ENABLE_MEDIA
-	this_thread::sleep_for(1s);
 	try {
 		cout << endl << "*** Running WebRTC Track test..." << endl;
 		test_track();
@@ -81,19 +73,10 @@ int main(int argc, char **argv) {
 		cerr << "WebRTC Track test failed: " << e.what() << endl;
 		return -1;
 	}
-	try {
-		cout << endl << "*** Running WebRTC C API track test..." << endl;
-		test_capi_track();
-		cout << "*** Finished WebRTC C API track test" << endl;
-	} catch (const exception &e) {
-		cerr << "WebRTC C API track test failed: " << e.what() << endl;
-		return -1;
-	}
 #endif
 #if RTC_ENABLE_WEBSOCKET
 // TODO: Temporarily disabled as the echo service is unreliable
 /*
-	this_thread::sleep_for(1s);
 	try {
 		cout << endl << "*** Running WebSocket test..." << endl;
 		test_websocket();
@@ -111,6 +94,38 @@ int main(int argc, char **argv) {
 		cerr << "WebSocketServer test failed: " << e.what() << endl;
 		return -1;
 	}
+#endif
+	try {
+		// Every created object must have been destroyed, otherwise the wait will block
+		cout << endl << "*** Running cleanup..." << endl;
+		if(rtc::Cleanup().wait_for(10s) == future_status::timeout)
+			throw std::runtime_error("Timeout");
+		cout << "*** Finished cleanup..." << endl;
+	} catch (const exception &e) {
+		cerr << "Cleanup failed: " << e.what() << endl;
+		return -1;
+	}
+
+	// C API tests
+	try {
+		cout << endl << "*** Running WebRTC C API connectivity test..." << endl;
+		test_capi_connectivity();
+		cout << "*** Finished WebRTC C API connectivity test" << endl;
+	} catch (const exception &e) {
+		cerr << "WebRTC C API connectivity test failed: " << e.what() << endl;
+		return -1;
+	}
+#if RTC_ENABLE_MEDIA
+	try {
+		cout << endl << "*** Running WebRTC C API track test..." << endl;
+		test_capi_track();
+		cout << "*** Finished WebRTC C API track test" << endl;
+	} catch (const exception &e) {
+		cerr << "WebRTC C API track test failed: " << e.what() << endl;
+		return -1;
+	}
+#endif
+#if RTC_ENABLE_WEBSOCKET
 	try {
 		cout << endl << "*** Running WebSocketServer C API test..." << endl;
 		test_capi_websocketserver();
@@ -120,8 +135,16 @@ int main(int argc, char **argv) {
 		return -1;
 	}
 #endif
+	try {
+		cout << endl << "*** Running C API cleanup..." << endl;
+		rtcCleanup();
+		cout << "*** Finished C API cleanup..." << endl;
+	} catch (const exception &e) {
+		cerr << "C API cleanup failed: " << e.what() << endl;
+		return -1;
+	}
 /*
-    this_thread::sleep_for(1s);
+	// Benchmark
 	try {
 		cout << endl << "*** Running WebRTC benchmark..." << endl;
 		test_benchmark();
@@ -132,6 +155,5 @@ int main(int argc, char **argv) {
 		return -1;
 	}
 */
-	std::this_thread::sleep_for(1s);
 	return 0;
 }