Browse Source

Merge pull request #1486 from paullouisageneau/thread-pool-size-api

Add SetThreadPoolSize to configure the global thread count
Paul-Louis Ageneau 1 week ago
parent
commit
faed3cf9fb
8 changed files with 61 additions and 35 deletions
  1. 5 2
      include/rtc/global.hpp
  2. 7 5
      include/rtc/rtc.h
  3. 31 23
      src/capi.cpp
  4. 3 2
      src/global.cpp
  5. 8 2
      src/impl/init.cpp
  6. 3 0
      src/impl/init.hpp
  7. 1 1
      src/impl/internals.hpp
  8. 3 0
      test/main.cpp

+ 5 - 2
include/rtc/global.hpp

@@ -31,8 +31,7 @@ typedef std::function<void(LogLevel level, string message)> LogCallback;
 
 
 RTC_CPP_EXPORT void InitLogger(LogLevel level, LogCallback callback = nullptr);
 RTC_CPP_EXPORT void InitLogger(LogLevel level, LogCallback callback = nullptr);
 
 
-RTC_CPP_EXPORT void Preload();
-RTC_CPP_EXPORT std::shared_future<void> Cleanup();
+RTC_CPP_EXPORT void SetThreadPoolSize(unsigned int count); // 0: hardware concurrency
 
 
 struct SctpSettings {
 struct SctpSettings {
 	// For the following settings, not set means optimized default
 	// For the following settings, not set means optimized default
@@ -52,6 +51,10 @@ struct SctpSettings {
 
 
 RTC_CPP_EXPORT void SetSctpSettings(SctpSettings s);
 RTC_CPP_EXPORT void SetSctpSettings(SctpSettings s);
 
 
+// Optional global preload and cleanup
+RTC_CPP_EXPORT void Preload();
+RTC_CPP_EXPORT std::shared_future<void> Cleanup();
+
 RTC_CPP_EXPORT std::ostream &operator<<(std::ostream &out, LogLevel level);
 RTC_CPP_EXPORT std::ostream &operator<<(std::ostream &out, LogLevel level);
 
 
 } // namespace rtc
 } // namespace rtc

+ 7 - 5
include/rtc/rtc.h

@@ -513,12 +513,10 @@ RTC_C_EXPORT int rtcGetWebSocketServerPort(int wsserver);
 
 
 #endif
 #endif
 
 
-// Optional global preload and cleanup
-
-RTC_C_EXPORT void rtcPreload(void);
-RTC_C_EXPORT void rtcCleanup(void);
+// Global settings
 
 
-// SCTP global settings
+// Note: Applied when threads are spawned
+RTC_C_EXPORT int rtcSetThreadPoolSize(unsigned int count);
 
 
 typedef struct {
 typedef struct {
 	int recvBufferSize;          // in bytes, <= 0 means optimized default
 	int recvBufferSize;          // in bytes, <= 0 means optimized default
@@ -538,6 +536,10 @@ typedef struct {
 // Note: SCTP settings apply to newly-created PeerConnections only
 // Note: SCTP settings apply to newly-created PeerConnections only
 RTC_C_EXPORT int rtcSetSctpSettings(const rtcSctpSettings *settings);
 RTC_C_EXPORT int rtcSetSctpSettings(const rtcSctpSettings *settings);
 
 
+// Optional global preload and cleanup
+RTC_C_EXPORT void rtcPreload(void);
+RTC_C_EXPORT void rtcCleanup(void);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 } // extern "C"
 } // extern "C"
 #endif
 #endif

+ 31 - 23
src/capi.cpp

@@ -1096,7 +1096,7 @@ int rtcAddTrackEx(int pc, const rtcTrackInit *init) {
 		case RTC_CODEC_OPUS:
 		case RTC_CODEC_OPUS:
 		case RTC_CODEC_PCMU:
 		case RTC_CODEC_PCMU:
 		case RTC_CODEC_PCMA:
 		case RTC_CODEC_PCMA:
-		case RTC_CODEC_AAC: 
+		case RTC_CODEC_AAC:
 		case RTC_CODEC_G722: {
 		case RTC_CODEC_G722: {
 			auto audio = std::make_unique<Description::Audio>(mid, direction);
 			auto audio = std::make_unique<Description::Audio>(mid, direction);
 			switch (init->codec) {
 			switch (init->codec) {
@@ -1687,28 +1687,11 @@ int rtcGetWebSocketServerPort(int wsserver) {
 
 
 #endif
 #endif
 
 
-void rtcPreload() {
-	try {
-		rtc::Preload();
-	} catch (const std::exception &e) {
-		PLOG_ERROR << e.what();
-	}
-}
-
-void rtcCleanup() {
-	try {
-		size_t count = eraseAll();
-		if (count != 0) {
-			PLOG_INFO << count << " objects were not properly destroyed before cleanup";
-		}
-
-		if (rtc::Cleanup().wait_for(10s) == std::future_status::timeout)
-			throw std::runtime_error(
-			    "Cleanup timeout (possible deadlock or undestructible object)");
-
-	} catch (const std::exception &e) {
-		PLOG_ERROR << e.what();
-	}
+int rtcSetThreadPoolSize(unsigned int count) {
+	return wrap([&] {
+		SetThreadPoolSize(count);
+		return RTC_ERR_SUCCESS;
+	});
 }
 }
 
 
 int rtcSetSctpSettings(const rtcSctpSettings *settings) {
 int rtcSetSctpSettings(const rtcSctpSettings *settings) {
@@ -1759,3 +1742,28 @@ int rtcSetSctpSettings(const rtcSctpSettings *settings) {
 		return RTC_ERR_SUCCESS;
 		return RTC_ERR_SUCCESS;
 	});
 	});
 }
 }
+
+void rtcPreload() {
+	try {
+		rtc::Preload();
+	} catch (const std::exception &e) {
+		PLOG_ERROR << e.what();
+	}
+}
+
+void rtcCleanup() {
+	try {
+		size_t count = eraseAll();
+		if (count != 0) {
+			PLOG_INFO << count << " objects were not properly destroyed before cleanup";
+		}
+
+		if (rtc::Cleanup().wait_for(10s) == std::future_status::timeout)
+			throw std::runtime_error(
+			    "Cleanup timeout (possible deadlock or undestructible object)");
+
+	} catch (const std::exception &e) {
+		PLOG_ERROR << e.what();
+	}
+}
+

+ 3 - 2
src/global.cpp

@@ -83,11 +83,12 @@ void InitLogger(plog::Severity severity, plog::IAppender *appender) {
 	plogInit(severity, appender);
 	plogInit(severity, appender);
 }
 }
 
 
+void SetThreadPoolSize(unsigned int count) { impl::Init::Instance().setThreadPoolSize(count); }
+void SetSctpSettings(SctpSettings s) { impl::Init::Instance().setSctpSettings(std::move(s)); }
+
 void Preload() { impl::Init::Instance().preload(); }
 void Preload() { impl::Init::Instance().preload(); }
 std::shared_future<void> Cleanup() { return impl::Init::Instance().cleanup(); }
 std::shared_future<void> Cleanup() { return impl::Init::Instance().cleanup(); }
 
 
-void SetSctpSettings(SctpSettings s) { impl::Init::Instance().setSctpSettings(std::move(s)); }
-
 std::ostream &operator<<(std::ostream &out, LogLevel level) {
 std::ostream &operator<<(std::ostream &out, LogLevel level) {
 	switch (level) {
 	switch (level) {
 	case LogLevel::Fatal:
 	case LogLevel::Fatal:

+ 8 - 2
src/impl/init.cpp

@@ -96,6 +96,12 @@ std::shared_future<void> Init::cleanup() {
 	return mCleanupFuture;
 	return mCleanupFuture;
 }
 }
 
 
+void Init::setThreadPoolSize(unsigned int count) {
+	std::lock_guard lock(mMutex);
+	mThreadPoolSize = count;
+
+}
+
 void Init::setSctpSettings(SctpSettings s) {
 void Init::setSctpSettings(SctpSettings s) {
 	std::lock_guard lock(mMutex);
 	std::lock_guard lock(mMutex);
 	if (mGlobal)
 	if (mGlobal)
@@ -118,8 +124,8 @@ void Init::doInit() {
 		throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
 		throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
 #endif
 #endif
 
 
-	int concurrency = std::thread::hardware_concurrency();
-	int count = std::max(concurrency, MIN_THREADPOOL_SIZE);
+	unsigned int count = mThreadPoolSize > 0 ? mThreadPoolSize : std::thread::hardware_concurrency();
+	count = std::max(count, MIN_THREADPOOL_SIZE);
 	PLOG_DEBUG << "Spawning " << count << " threads";
 	PLOG_DEBUG << "Spawning " << count << " threads";
 	ThreadPool::Instance().spawn(count);
 	ThreadPool::Instance().spawn(count);
 
 

+ 3 - 0
src/impl/init.hpp

@@ -32,6 +32,8 @@ public:
 	init_token token();
 	init_token token();
 	void preload();
 	void preload();
 	std::shared_future<void> cleanup();
 	std::shared_future<void> cleanup();
+
+	void setThreadPoolSize(unsigned int count);
 	void setSctpSettings(SctpSettings s);
 	void setSctpSettings(SctpSettings s);
 
 
 private:
 private:
@@ -45,6 +47,7 @@ private:
 	weak_ptr<void> mWeak;
 	weak_ptr<void> mWeak;
 	bool mInitialized = false;
 	bool mInitialized = false;
 	SctpSettings mCurrentSctpSettings = {};
 	SctpSettings mCurrentSctpSettings = {};
+	unsigned int mThreadPoolSize = 0;
 	std::mutex mMutex;
 	std::mutex mMutex;
 	std::shared_future<void> mCleanupFuture;
 	std::shared_future<void> mCleanupFuture;
 
 

+ 1 - 1
src/impl/internals.hpp

@@ -45,7 +45,7 @@ const size_t DEFAULT_WS_MAX_MESSAGE_SIZE = 256 * 1024;   // Default max message
 
 
 const size_t RECV_QUEUE_LIMIT = 1024; // Max per-channel queue size (messages)
 const size_t RECV_QUEUE_LIMIT = 1024; // Max per-channel queue size (messages)
 
 
-const int MIN_THREADPOOL_SIZE = 4; // Minimum number of threads in the global thread pool (>= 2)
+const unsigned int MIN_THREADPOOL_SIZE = 2; // Minimum number of threads in the global thread pool (>= 2)
 
 
 const size_t DEFAULT_MTU = RTC_DEFAULT_MTU; // defined in rtc.h
 const size_t DEFAULT_MTU = RTC_DEFAULT_MTU; // defined in rtc.h
 
 

+ 3 - 0
test/main.cpp

@@ -96,6 +96,8 @@ static const vector<Test> tests = {
 };
 };
 
 
 int main(int argc, char **argv) {
 int main(int argc, char **argv) {
+	rtc::SetThreadPoolSize(4);
+
 	int success_tests = 0;
 	int success_tests = 0;
 	int failed_tests = 0;
 	int failed_tests = 0;
 	steady_clock::time_point startTime, endTime;
 	steady_clock::time_point startTime, endTime;
@@ -118,6 +120,7 @@ int main(int argc, char **argv) {
 	cout << "Finished " << success_tests + failed_tests << " tests in " << durationS.count()
 	cout << "Finished " << success_tests + failed_tests << " tests in " << durationS.count()
 	     << "s (" << durationMs.count() << " ms). Succeeded: " << success_tests
 	     << "s (" << durationMs.count() << " ms). Succeeded: " << success_tests
 	     << ". Failed: " << failed_tests << "." << endl;
 	     << ". Failed: " << failed_tests << "." << endl;
+
 	/*
 	/*
 	    // Benchmark
 	    // Benchmark
 	    try {
 	    try {