Browse Source

Implemented Semaphore using pthreads on Linux and grand central dispatch on mac (#1462)

This generates a lot less stalls than the mutex/condition_variable construct. Semaphore::Release goes from 30 us per call to 1 us per call. Especially on Linux this speeds up the PerformanceTest by 5-20% at higher thread counts.
Jorrit Rouwe 7 months ago
parent
commit
af77fa8e8e
2 changed files with 71 additions and 24 deletions
  1. 41 5
      Jolt/Core/Semaphore.cpp
  2. 30 19
      Jolt/Core/Semaphore.h

+ 41 - 5
Jolt/Core/Semaphore.cpp

@@ -17,7 +17,6 @@
 #else
 	#include <windows.h>
 #endif
-
 	JPH_SUPPRESS_WARNING_POP
 #endif
 
@@ -27,6 +26,25 @@ Semaphore::Semaphore()
 {
 #ifdef JPH_PLATFORM_WINDOWS
 	mSemaphore = CreateSemaphore(nullptr, 0, INT_MAX, nullptr);
+	if (mSemaphore == nullptr)
+	{
+		Trace("Failed to create semaphore");
+		std::abort();
+	}
+#elif defined(JPH_USE_PTHREADS)
+	int ret = sem_init(&mSemaphore, 0, 0);
+	if (ret == -1)
+	{
+		Trace("Failed to create semaphore");
+		std::abort();
+	}
+#elif defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+	mSemaphore = dispatch_semaphore_create(0);
+	if (mSemaphore == nullptr)
+	{
+		Trace("Failed to create semaphore");
+		std::abort();
+	}
 #endif
 }
 
@@ -34,6 +52,10 @@ Semaphore::~Semaphore()
 {
 #ifdef JPH_PLATFORM_WINDOWS
 	CloseHandle(mSemaphore);
+#elif defined(JPH_USE_PTHREADS)
+	sem_destroy(&mSemaphore);
+#elif defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+	dispatch_release(mSemaphore);
 #endif
 }
 
@@ -41,13 +63,21 @@ void Semaphore::Release(uint inNumber)
 {
 	JPH_ASSERT(inNumber > 0);
 
-#ifdef JPH_PLATFORM_WINDOWS
-	int old_value = mCount.fetch_add(inNumber);
+#if defined(JPH_PLATFORM_WINDOWS) || defined(JPH_USE_PTHREADS) || defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+	int old_value = mCount.fetch_add(inNumber, std::memory_order_release);
 	if (old_value < 0)
 	{
 		int new_value = old_value + (int)inNumber;
 		int num_to_release = min(new_value, 0) - old_value;
+	#ifdef JPH_PLATFORM_WINDOWS
 		::ReleaseSemaphore(mSemaphore, num_to_release, nullptr);
+	#elif defined(JPH_USE_PTHREADS)
+		for (int i = 0; i < num_to_release; ++i)
+			sem_post(&mSemaphore);
+	#elif defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+		for (int i = 0; i < num_to_release; ++i)
+			dispatch_semaphore_signal(mSemaphore);
+	#endif
 	}
 #else
 	std::lock_guard lock(mLock);
@@ -63,14 +93,20 @@ void Semaphore::Acquire(uint inNumber)
 {
 	JPH_ASSERT(inNumber > 0);
 
-#ifdef JPH_PLATFORM_WINDOWS
-	int old_value = mCount.fetch_sub(inNumber);
+#if defined(JPH_PLATFORM_WINDOWS) || defined(JPH_USE_PTHREADS) || defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+	int old_value = mCount.fetch_sub(inNumber, std::memory_order_acquire);
 	int new_value = old_value - (int)inNumber;
 	if (new_value < 0)
 	{
 		int num_to_acquire = min(old_value, 0) - new_value;
 		for (int i = 0; i < num_to_acquire; ++i)
+		#ifdef JPH_PLATFORM_WINDOWS
 			WaitForSingleObject(mSemaphore, INFINITE);
+		#elif defined(JPH_USE_PTHREADS)
+			sem_wait(&mSemaphore);
+		#elif defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+			dispatch_semaphore_wait(mSemaphore, DISPATCH_TIME_FOREVER);
+		#endif
 	}
 #else
 	std::unique_lock lock(mLock);

+ 30 - 19
Jolt/Core/Semaphore.h

@@ -4,47 +4,58 @@
 
 #pragma once
 
+#include <Jolt/Core/Atomics.h>
+
+// Determine if we will use pthreads or not
 JPH_SUPPRESS_WARNINGS_STD_BEGIN
-#include <atomic>
-#include <mutex>
-#include <condition_variable>
+#if defined(JPH_PLATFORM_LINUX) || defined(JPH_PLATFORM_ANDROID) || defined(JPH_PLATFORM_BSD) || defined(JPH_PLATFORM_WASM)
+	#include <semaphore.h>
+	#define JPH_USE_PTHREADS
+#elif defined(JPH_PLATFORM_MACOS) || defined(JPH_PLATFORM_IOS)
+	#include <dispatch/dispatch.h>
+	#define JPH_USE_GRAND_CENTRAL_DISPATCH
+#elif !defined(JPH_PLATFORM_WINDOWS)
+	#include <mutex>
+	#include <condition_variable>
+#endif
 JPH_SUPPRESS_WARNINGS_STD_END
 
 JPH_NAMESPACE_BEGIN
 
-// Things we're using from STL
-using std::atomic;
-using std::mutex;
-using std::condition_variable;
-
 /// Implements a semaphore
 /// When we switch to C++20 we can use counting_semaphore to unify this
 class JPH_EXPORT Semaphore
 {
 public:
 	/// Constructor
-						Semaphore();
-						~Semaphore();
+							Semaphore();
+							~Semaphore();
 
 	/// Release the semaphore, signaling the thread waiting on the barrier that there may be work
-	void				Release(uint inNumber = 1);
+	void					Release(uint inNumber = 1);
 
 	/// Acquire the semaphore inNumber times
-	void				Acquire(uint inNumber = 1);
+	void					Acquire(uint inNumber = 1);
 
 	/// Get the current value of the semaphore
-	inline int			GetValue() const								{ return mCount.load(std::memory_order_relaxed); }
+	inline int				GetValue() const								{ return mCount.load(std::memory_order_relaxed); }
 
 private:
+#if defined(JPH_PLATFORM_WINDOWS) || defined(JPH_USE_PTHREADS) || defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
 #ifdef JPH_PLATFORM_WINDOWS
-	// On windows we use a semaphore object since it is more efficient than a lock and a condition variable
-	alignas(JPH_CACHE_LINE_SIZE) atomic<int> mCount { 0 };				///< We increment mCount for every release, to acquire we decrement the count. If the count is negative we know that we are waiting on the actual semaphore.
-	void *				mSemaphore;										///< The semaphore is an expensive construct so we only acquire/release it if we know that we need to wait/have waiting threads
+	using SemaphoreType = void *;
+#elif defined(JPH_USE_PTHREADS)
+	using SemaphoreType = sem_t;
+#elif defined(JPH_USE_GRAND_CENTRAL_DISPATCH)
+	using SemaphoreType = dispatch_semaphore_t;
+#endif
+	alignas(JPH_CACHE_LINE_SIZE) atomic<int> mCount { 0 };					///< We increment mCount for every release, to acquire we decrement the count. If the count is negative we know that we are waiting on the actual semaphore.
+	SemaphoreType			mSemaphore;										///< The semaphore is an expensive construct so we only acquire/release it if we know that we need to wait/have waiting threads
 #else
 	// Other platforms: Emulate a semaphore using a mutex, condition variable and count
-	mutex				mLock;
-	condition_variable	mWaitVariable;
-	atomic<int>			mCount { 0 };
+	std::mutex				mLock;
+	std::condition_variable	mWaitVariable;
+	atomic<int>				mCount { 0 };
 #endif
 };