瀏覽代碼

Split off Barrier implementation to JobSystemWithBarrier (#464)

This simplifies creating your own implementation of JobSystem by already implementing the Barrier class
Jorrit Rouwe 2 年之前
父節點
當前提交
53e1608634

+ 3 - 2
Jolt/Core/JobSystem.h

@@ -42,6 +42,7 @@ JPH_NAMESPACE_BEGIN
 ///
 /// If you want to implement your own job system, inherit from JobSystem and implement:
 ///
+/// * JobSystem::GetMaxConcurrency - This should return the maximum number of jobs that can run in parallel.
 /// * JobSystem::CreateJob - This should create a Job object and return it to the caller.
 /// * JobSystem::FreeJob - This should free the memory associated with the job object. It is called by the Job destructor when it is Release()-ed for the last time.
 /// * JobSystem::QueueJob/QueueJobs - These should store the job pointer in an internal queue to run immediately (dependencies are tracked internally, this function is called when the job can run).
@@ -61,10 +62,10 @@ JPH_NAMESPACE_BEGIN
 /// * JobSystem::DestroyBarrier - Destroy a barrier.
 /// * JobSystem::WaitForJobs - This is the main function that is used to wait for all jobs that have been added to a Barrier. WaitForJobs can execute jobs that have
 /// been added to the barrier while waiting. It is not wise to execute other jobs that touch physics structures as this can cause race conditions and deadlocks. Please keep in mind that the barrier is 
-/// only intended to wait on the completion of the jolt jobs added to it, if you scheduled any jobs in your engine's job system to execute the Jolt jobs as part of QueueJob/QueueJobs, you might still need 
+/// only intended to wait on the completion of the Jolt jobs added to it, if you scheduled any jobs in your engine's job system to execute the Jolt jobs as part of QueueJob/QueueJobs, you might still need 
 /// to wait for these in this function after the barrier is finished waiting.
 ///
-/// An example implementation is JobSystemThreadPool, you can also use this as an example of how to implement Barriers (this implementation is not dependent on the underlying job system).
+/// An example implementation is JobSystemThreadPool. If you don't want to write the Barrier class you can also inherit from JobSystemWithBarrier.
 class JobSystem : public NonCopyable
 {
 protected:

+ 1 - 255
Jolt/Core/JobSystemThreadPool.cpp

@@ -23,220 +23,9 @@
 
 JPH_NAMESPACE_BEGIN
 
-JobSystemThreadPool::Semaphore::Semaphore()
-{
-#ifdef JPH_PLATFORM_WINDOWS
-	mSemaphore = CreateSemaphore(nullptr, 0, INT_MAX, nullptr);
-#endif
-}
-
-JobSystemThreadPool::Semaphore::~Semaphore()
-{
-#ifdef JPH_PLATFORM_WINDOWS
-	CloseHandle(mSemaphore);
-#endif
-}
-
-void JobSystemThreadPool::Semaphore::Release(uint inNumber)
-{
-	JPH_ASSERT(inNumber > 0);
-
-#ifdef JPH_PLATFORM_WINDOWS
-	int old_value = mCount.fetch_add(inNumber);
-	if (old_value < 0)
-	{
-		int new_value = old_value + (int)inNumber;
-		int num_to_release = min(new_value, 0) - old_value;
-		::ReleaseSemaphore(mSemaphore, num_to_release, nullptr);
-	}
-#else
-	lock_guard lock(mLock);
-	mCount += (int)inNumber;
-	if (inNumber > 1)
-		mWaitVariable.notify_all();
-	else
-		mWaitVariable.notify_one();
-#endif
-}
-
-void JobSystemThreadPool::Semaphore::Acquire(uint inNumber)
-{
-	JPH_ASSERT(inNumber > 0);
-
-#ifdef JPH_PLATFORM_WINDOWS
-	int old_value = mCount.fetch_sub(inNumber);
-	int new_value = old_value - (int)inNumber;
-	if (new_value < 0)
-	{
-		int num_to_acquire = min(old_value, 0) - new_value;
-		for (int i = 0; i < num_to_acquire; ++i)
-			WaitForSingleObject(mSemaphore, INFINITE);
-	}
-#else
-	unique_lock lock(mLock);
-	mCount -= (int)inNumber;
-	mWaitVariable.wait(lock, [this]() { return mCount >= 0; });
-#endif
-}
-
-JobSystemThreadPool::BarrierImpl::BarrierImpl()
-{
-	for (atomic<Job *> &j : mJobs)
-		j = nullptr;
-}
-
-JobSystemThreadPool::BarrierImpl::~BarrierImpl()
-{
-	JPH_ASSERT(IsEmpty());
-}
-
-void JobSystemThreadPool::BarrierImpl::AddJob(const JobHandle &inJob)
-{
-	JPH_PROFILE_FUNCTION();
-
-	bool release_semaphore = false;
-
-	// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
-	Job *job = inJob.GetPtr();
-	if (job->SetBarrier(this))
-	{
-		// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
-		mNumToAcquire++;
-		if (job->CanBeExecuted())
-		{
-			release_semaphore = true;
-			mNumToAcquire++;
-		}
-
-		// Add the job to our job list
-		job->AddRef();
-		uint write_index = mJobWriteIndex++;
-		while (write_index - mJobReadIndex >= cMaxJobs)
-		{
-			JPH_ASSERT(false, "Barrier full, stalling!");
-			std::this_thread::sleep_for(std::chrono::microseconds(100));
-		}
-		mJobs[write_index & (cMaxJobs - 1)] = job;
-	}
-
-	// Notify waiting thread that a new executable job is available
-	if (release_semaphore)
-		mSemaphore.Release();
-}
-
-void JobSystemThreadPool::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)
-{
-	JPH_PROFILE_FUNCTION();
-
-	bool release_semaphore = false;
-
-	for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)
-	{
-		// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
-		Job *job = handle->GetPtr();
-		if (job->SetBarrier(this))
-		{
-			// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
-			mNumToAcquire++;
-			if (!release_semaphore && job->CanBeExecuted())
-			{
-				release_semaphore = true;
-				mNumToAcquire++;
-			}
-
-			// Add the job to our job list
-			job->AddRef();
-			uint write_index = mJobWriteIndex++;
-			while (write_index - mJobReadIndex >= cMaxJobs)
-			{
-				JPH_ASSERT(false, "Barrier full, stalling!");
-				std::this_thread::sleep_for(std::chrono::microseconds(100));
-			}
-			mJobs[write_index & (cMaxJobs - 1)] = job;
-		}
-	}
-
-	// Notify waiting thread that a new executable job is available
-	if (release_semaphore)
-		mSemaphore.Release();
-}
-
-void JobSystemThreadPool::BarrierImpl::OnJobFinished(Job *inJob)
-{
-	JPH_PROFILE_FUNCTION();
-
-	mSemaphore.Release();
-}
-
-void JobSystemThreadPool::BarrierImpl::Wait()
-{
-	while (mNumToAcquire > 0)
-	{
-		{
-			JPH_PROFILE("Execute Jobs");
-
-			// Go through all jobs
-			bool has_executed;
-			do
-			{
-				has_executed = false;
-
-				// Loop through the jobs and erase jobs from the beginning of the list that are done
-				while (mJobReadIndex < mJobWriteIndex)
-				{				
-					atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
-					Job *job_ptr = job.load();
-					if (job_ptr == nullptr || !job_ptr->IsDone())
-						break;
-
-					// Job is finished, release it
-					job_ptr->Release();
-					job = nullptr;
-					++mJobReadIndex;
-				}
-
-				// Loop through the jobs and execute the first executable job
-				for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)
-				{
-					const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];
-					Job *job_ptr = job.load();
-					if (job_ptr != nullptr && job_ptr->CanBeExecuted())
-					{
-						// This will only execute the job if it has not already executed
-						job_ptr->Execute();
-						has_executed = true;
-						break;
-					}
-				}
-
-			} while (has_executed);
-		}
-
-		// Wait for another thread to wake us when either there is more work to do or when all jobs have completed
-		int num_to_acquire = max(1, mSemaphore.GetValue()); // When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs
-		mSemaphore.Acquire(num_to_acquire);
-		mNumToAcquire -= num_to_acquire;
-	}
-
-	// All jobs should be done now, release them
-	while (mJobReadIndex < mJobWriteIndex)
-	{				
-		atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
-		Job *job_ptr = job.load();
-		JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());
-		job_ptr->Release();
-		job = nullptr;
-		++mJobReadIndex;
-	}
-}
-
 void JobSystemThreadPool::Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
 {
-	JPH_ASSERT(mBarriers == nullptr); // Already initialized?
-
-	// Init freelist of barriers
-	mMaxBarriers = inMaxBarriers;
-	mBarriers = new BarrierImpl [inMaxBarriers];
+	JobSystemWithBarrier::Init(inMaxBarriers);
 
 	// Init freelist of jobs
 	mJobs.Init(inMaxJobs, inMaxJobs);
@@ -283,13 +72,6 @@ JobSystemThreadPool::~JobSystemThreadPool()
 {
 	// Stop all worker threads
 	StopThreads();
-
-	// Ensure that none of the barriers are used
-#ifdef JPH_ENABLE_ASSERTS
-	for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)
-		JPH_ASSERT(!b->mInUse);
-#endif // JPH_ENABLE_ASSERTS
-	delete [] mBarriers;
 }
 
 void JobSystemThreadPool::StopThreads()
@@ -360,42 +142,6 @@ void JobSystemThreadPool::FreeJob(Job *inJob)
 	mJobs.DestructObject(inJob);
 }
 
-JobSystem::Barrier *JobSystemThreadPool::CreateBarrier()
-{
-	JPH_PROFILE_FUNCTION();
-
-	// Find the first unused barrier
-	for (uint32 index = 0; index < mMaxBarriers; ++index)
-	{
-		bool expected = false;
-		if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))
-			return &mBarriers[index];
-	}
-
-	return nullptr;
-}
-
-void JobSystemThreadPool::DestroyBarrier(Barrier *inBarrier)
-{
-	JPH_PROFILE_FUNCTION();
-
-	// Check that no jobs are in the barrier
-	JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());
-
-	// Flag the barrier as unused
-	bool expected = true;
-	static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);
-	JPH_ASSERT(expected);
-}
-
-void JobSystemThreadPool::WaitForJobs(Barrier *inBarrier)
-{
-	JPH_PROFILE_FUNCTION();
-
-	// Let our barrier implementation wait for the jobs
-	static_cast<BarrierImpl *>(inBarrier)->Wait();
-}
-
 uint JobSystemThreadPool::GetHead() const
 {
 	// Find the minimal value across all threads

+ 3 - 79
Jolt/Core/JobSystemThreadPool.h

@@ -4,28 +4,25 @@
 
 #pragma once
 
-#include <Jolt/Core/JobSystem.h>
+#include <Jolt/Core/JobSystemWithBarrier.h>
 #include <Jolt/Core/FixedSizeFreeList.h>
+#include <Jolt/Core/Semaphore.h>
 
 JPH_SUPPRESS_WARNINGS_STD_BEGIN
 #include <thread>
-#include <mutex>
-#include <condition_variable>
 JPH_SUPPRESS_WARNINGS_STD_END
 
 JPH_NAMESPACE_BEGIN
 
 // Things we're using from STL
-using std::atomic;
 using std::thread;
-using std::condition_variable;
 
 /// Implementation of a JobSystem using a thread pool
 /// 
 /// Note that this is considered an example implementation. It is expected that when you integrate
 /// the physics engine into your own project that you'll provide your own implementation of the
 /// JobSystem built on top of whatever job system your project uses.
-class JobSystemThreadPool final : public JobSystem
+class JobSystemThreadPool final : public JobSystemWithBarrier
 {
 public:
 	JPH_OVERRIDE_NEW_DELETE
@@ -45,9 +42,6 @@ public:
 	// See JobSystem
 	virtual int				GetMaxConcurrency() const override				{ return int(mThreads.size()) + 1; }
 	virtual JobHandle		CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) override;
-	virtual Barrier *		CreateBarrier() override;
-	virtual void			DestroyBarrier(Barrier *inBarrier) override;
-	virtual void			WaitForJobs(Barrier *inBarrier) override;
 
 	/// Change the max concurrency after initialization
 	void					SetNumThreads(int inNumThreads)					{ StopThreads(); StartThreads(inNumThreads); }
@@ -59,72 +53,6 @@ protected:
 	virtual void			FreeJob(Job *inJob) override;
 
 private:
-	/// When we switch to C++20 we can use counting_semaphore to unify this
-	class Semaphore
-	{
-	public:
-		/// Constructor
-		inline				Semaphore();
-		inline				~Semaphore();
-
-		/// Release the semaphore, signalling the thread waiting on the barrier that there may be work
-		inline void			Release(uint inNumber = 1);
-
-		/// Acquire the semaphore inNumber times
-		inline void			Acquire(uint inNumber = 1);
-
-		/// Get the current value of the semaphore
-		inline int			GetValue() const								{ return mCount; }
-
-	private:
-#ifdef JPH_PLATFORM_WINDOWS
-		// On windows we use a semaphore object since it is more efficient than a lock and a condition variable
-		alignas(JPH_CACHE_LINE_SIZE) atomic<int> mCount { 0 };				///< We increment mCount for every release, to acquire we decrement the count. If the count is negative we know that we are waiting on the actual semaphore.
-		void *				mSemaphore;										///< The semaphore is an expensive construct so we only acquire/release it if we know that we need to wait/have waiting threads
-#else
-		// Other platforms: Emulate a semaphore using a mutex, condition variable and count
-		mutex				mLock;
-		condition_variable	mWaitVariable;
-		int					mCount = 0;
-#endif
-	};
-
-	class BarrierImpl : public Barrier
-	{
-	public:
-		JPH_OVERRIDE_NEW_DELETE
-
-		/// Constructor
-							BarrierImpl();
-		virtual				~BarrierImpl() override;
-
-		// See Barrier
-		virtual void		AddJob(const JobHandle &inJob) override;
-		virtual void		AddJobs(const JobHandle *inHandles, uint inNumHandles) override;
-
-		/// Check if there are any jobs in the job barrier
-		inline bool			IsEmpty() const									{ return mJobReadIndex == mJobWriteIndex; }
-
-		/// Wait for all jobs in this job barrier, while waiting, execute jobs that are part of this barrier on the current thread
-		void				Wait();
-
-		/// Flag to indicate if a barrier has been handed out
-		atomic<bool>		mInUse { false };
-
-	protected:
-		/// Called by a Job to mark that it is finished
-		virtual void		OnJobFinished(Job *inJob) override;
-
-		/// Jobs queue for the barrier
-		static constexpr uint cMaxJobs = 2048;
-		static_assert(IsPowerOf2(cMaxJobs));								// We do bit operations and require max jobs to be a power of 2
-		atomic<Job *> 		mJobs[cMaxJobs];								///< List of jobs that are part of this barrier, nullptrs for empty slots
-		alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobReadIndex { 0 };		///< First job that could be valid (modulo cMaxJobs), can be nullptr if other thread is still working on adding the job
-		alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobWriteIndex { 0 };		///< First job that can be written (modulo cMaxJobs)
-		atomic<int>			mNumToAcquire { 0 };							///< Number of times the semaphore has been released, the barrier should acquire the semaphore this many times (written at the same time as mJobWriteIndex so ok to put in same cache line)
-		Semaphore			mSemaphore;										///< Semaphore used by finishing jobs to signal the barrier that they're done
-	};
-
 	/// Start/stop the worker threads
 	void					StartThreads(int inNumThreads);
 	void					StopThreads();
@@ -142,10 +70,6 @@ private:
 	using AvailableJobs = FixedSizeFreeList<Job>;
 	AvailableJobs			mJobs;
 
-	/// Array of barriers (we keep them constructed all the time since constructing a semaphore/mutex is not cheap)
-	uint					mMaxBarriers = 0;								///< Max amount of barriers
-	BarrierImpl *			mBarriers = nullptr;							///< List of the actual barriers
-
 	/// Threads running jobs
 	Array<thread>			mThreads;
 

+ 227 - 0
Jolt/Core/JobSystemWithBarrier.cpp

@@ -0,0 +1,227 @@
+// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
+// SPDX-FileCopyrightText: 2023 Jorrit Rouwe
+// SPDX-License-Identifier: MIT
+
+#include <Jolt/Jolt.h>
+
+#include <Jolt/Core/JobSystemWithBarrier.h>
+#include <Jolt/Core/Profiler.h>
+
+JPH_SUPPRESS_WARNINGS_STD_BEGIN
+#include <thread>
+JPH_SUPPRESS_WARNINGS_STD_END
+
+JPH_NAMESPACE_BEGIN
+
+JobSystemWithBarrier::BarrierImpl::BarrierImpl()
+{
+	for (atomic<Job *> &j : mJobs)
+		j = nullptr;
+}
+
+JobSystemWithBarrier::BarrierImpl::~BarrierImpl()
+{
+	JPH_ASSERT(IsEmpty());
+}
+
+void JobSystemWithBarrier::BarrierImpl::AddJob(const JobHandle &inJob)
+{
+	JPH_PROFILE_FUNCTION();
+
+	bool release_semaphore = false;
+
+	// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
+	Job *job = inJob.GetPtr();
+	if (job->SetBarrier(this))
+	{
+		// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
+		mNumToAcquire++;
+		if (job->CanBeExecuted())
+		{
+			release_semaphore = true;
+			mNumToAcquire++;
+		}
+
+		// Add the job to our job list
+		job->AddRef();
+		uint write_index = mJobWriteIndex++;
+		while (write_index - mJobReadIndex >= cMaxJobs)
+		{
+			JPH_ASSERT(false, "Barrier full, stalling!");
+			std::this_thread::sleep_for(std::chrono::microseconds(100));
+		}
+		mJobs[write_index & (cMaxJobs - 1)] = job;
+	}
+
+	// Notify waiting thread that a new executable job is available
+	if (release_semaphore)
+		mSemaphore.Release();
+}
+
+void JobSystemWithBarrier::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)
+{
+	JPH_PROFILE_FUNCTION();
+
+	bool release_semaphore = false;
+
+	for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)
+	{
+		// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
+		Job *job = handle->GetPtr();
+		if (job->SetBarrier(this))
+		{
+			// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
+			mNumToAcquire++;
+			if (!release_semaphore && job->CanBeExecuted())
+			{
+				release_semaphore = true;
+				mNumToAcquire++;
+			}
+
+			// Add the job to our job list
+			job->AddRef();
+			uint write_index = mJobWriteIndex++;
+			while (write_index - mJobReadIndex >= cMaxJobs)
+			{
+				JPH_ASSERT(false, "Barrier full, stalling!");
+				std::this_thread::sleep_for(std::chrono::microseconds(100));
+			}
+			mJobs[write_index & (cMaxJobs - 1)] = job;
+		}
+	}
+
+	// Notify waiting thread that a new executable job is available
+	if (release_semaphore)
+		mSemaphore.Release();
+}
+
+void JobSystemWithBarrier::BarrierImpl::OnJobFinished(Job *inJob)
+{
+	JPH_PROFILE_FUNCTION();
+
+	mSemaphore.Release();
+}
+
+void JobSystemWithBarrier::BarrierImpl::Wait()
+{
+	while (mNumToAcquire > 0)
+	{
+		{
+			JPH_PROFILE("Execute Jobs");
+
+			// Go through all jobs
+			bool has_executed;
+			do
+			{
+				has_executed = false;
+
+				// Loop through the jobs and erase jobs from the beginning of the list that are done
+				while (mJobReadIndex < mJobWriteIndex)
+				{				
+					atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
+					Job *job_ptr = job.load();
+					if (job_ptr == nullptr || !job_ptr->IsDone())
+						break;
+
+					// Job is finished, release it
+					job_ptr->Release();
+					job = nullptr;
+					++mJobReadIndex;
+				}
+
+				// Loop through the jobs and execute the first executable job
+				for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)
+				{
+					const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];
+					Job *job_ptr = job.load();
+					if (job_ptr != nullptr && job_ptr->CanBeExecuted())
+					{
+						// This will only execute the job if it has not already executed
+						job_ptr->Execute();
+						has_executed = true;
+						break;
+					}
+				}
+
+			} while (has_executed);
+		}
+
+		// Wait for another thread to wake us when either there is more work to do or when all jobs have completed
+		int num_to_acquire = max(1, mSemaphore.GetValue()); // When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs
+		mSemaphore.Acquire(num_to_acquire);
+		mNumToAcquire -= num_to_acquire;
+	}
+
+	// All jobs should be done now, release them
+	while (mJobReadIndex < mJobWriteIndex)
+	{				
+		atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
+		Job *job_ptr = job.load();
+		JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());
+		job_ptr->Release();
+		job = nullptr;
+		++mJobReadIndex;
+	}
+}
+
+void JobSystemWithBarrier::Init(uint inMaxBarriers)
+{
+	JPH_ASSERT(mBarriers == nullptr); // Already initialized?
+
+	// Init freelist of barriers
+	mMaxBarriers = inMaxBarriers;
+	mBarriers = new BarrierImpl [inMaxBarriers];
+}
+
+JobSystemWithBarrier::JobSystemWithBarrier(uint inMaxBarriers)
+{
+	Init(inMaxBarriers);
+}
+
+JobSystemWithBarrier::~JobSystemWithBarrier()
+{
+	// Ensure that none of the barriers are used
+#ifdef JPH_ENABLE_ASSERTS
+	for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)
+		JPH_ASSERT(!b->mInUse);
+#endif // JPH_ENABLE_ASSERTS
+	delete [] mBarriers;
+}
+
+JobSystem::Barrier *JobSystemWithBarrier::CreateBarrier()
+{
+	JPH_PROFILE_FUNCTION();
+
+	// Find the first unused barrier
+	for (uint32 index = 0; index < mMaxBarriers; ++index)
+	{
+		bool expected = false;
+		if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))
+			return &mBarriers[index];
+	}
+
+	return nullptr;
+}
+
+void JobSystemWithBarrier::DestroyBarrier(Barrier *inBarrier)
+{
+	JPH_PROFILE_FUNCTION();
+
+	// Check that no jobs are in the barrier
+	JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());
+
+	// Flag the barrier as unused
+	bool expected = true;
+	static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);
+	JPH_ASSERT(expected);
+}
+
+void JobSystemWithBarrier::WaitForJobs(Barrier *inBarrier)
+{
+	JPH_PROFILE_FUNCTION();
+
+	// Let our barrier implementation wait for the jobs
+	static_cast<BarrierImpl *>(inBarrier)->Wait();
+}
+
+JPH_NAMESPACE_END

+ 85 - 0
Jolt/Core/JobSystemWithBarrier.h

@@ -0,0 +1,85 @@
+// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
+// SPDX-FileCopyrightText: 2023 Jorrit Rouwe
+// SPDX-License-Identifier: MIT
+
+#pragma once
+
+#include <Jolt/Core/JobSystem.h>
+#include <Jolt/Core/Semaphore.h>
+
+JPH_NAMESPACE_BEGIN
+
+/// Implementation of the Barrier class for a JobSystem
+///
+/// This class can be used to make it easier to create a new JobSystem implementation that integrates with your own job system.
+/// It will implement all functionality relating to barriers, so the only functions that are left to be implemented are:
+///
+/// * JobSystem::GetMaxConcurrency
+/// * JobSystem::CreateJob
+/// * JobSystem::FreeJob
+/// * JobSystem::QueueJob/QueueJobs
+///
+/// See instructions in JobSystem for more information on how to implement these.
+class JobSystemWithBarrier : public JobSystem
+{
+public:
+	JPH_OVERRIDE_NEW_DELETE
+
+	/// Constructs barriers
+	/// @see JobSystemWithBarrier::Init
+	explicit				JobSystemWithBarrier(uint inMaxBarriers);
+							JobSystemWithBarrier() = default;
+	virtual					~JobSystemWithBarrier() override;
+
+	/// Initialize the barriers
+	/// @param inMaxBarriers Max number of barriers that can be allocated at any time
+	void					Init(uint inMaxBarriers);
+
+	// See JobSystem
+	virtual Barrier *		CreateBarrier() override;
+	virtual void			DestroyBarrier(Barrier *inBarrier) override;
+	virtual void			WaitForJobs(Barrier *inBarrier) override;
+
+private:
+	class BarrierImpl : public Barrier
+	{
+	public:
+		JPH_OVERRIDE_NEW_DELETE
+
+		/// Constructor
+							BarrierImpl();
+		virtual				~BarrierImpl() override;
+
+		// See Barrier
+		virtual void		AddJob(const JobHandle &inJob) override;
+		virtual void		AddJobs(const JobHandle *inHandles, uint inNumHandles) override;
+
+		/// Check if there are any jobs in the job barrier
+		inline bool			IsEmpty() const									{ return mJobReadIndex == mJobWriteIndex; }
+
+		/// Wait for all jobs in this job barrier, while waiting, execute jobs that are part of this barrier on the current thread
+		void				Wait();
+
+		/// Flag to indicate if a barrier has been handed out
+		atomic<bool>		mInUse { false };
+
+	protected:
+		/// Called by a Job to mark that it is finished
+		virtual void		OnJobFinished(Job *inJob) override;
+
+		/// Jobs queue for the barrier
+		static constexpr uint cMaxJobs = 2048;
+		static_assert(IsPowerOf2(cMaxJobs));								// We do bit operations and require max jobs to be a power of 2
+		atomic<Job *> 		mJobs[cMaxJobs];								///< List of jobs that are part of this barrier, nullptrs for empty slots
+		alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobReadIndex { 0 };		///< First job that could be valid (modulo cMaxJobs), can be nullptr if other thread is still working on adding the job
+		alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobWriteIndex { 0 };		///< First job that can be written (modulo cMaxJobs)
+		atomic<int>			mNumToAcquire { 0 };							///< Number of times the semaphore has been released, the barrier should acquire the semaphore this many times (written at the same time as mJobWriteIndex so ok to put in same cache line)
+		Semaphore			mSemaphore;										///< Semaphore used by finishing jobs to signal the barrier that they're done
+	};
+
+	/// Array of barriers (we keep them constructed all the time since constructing a semaphore/mutex is not cheap)
+	uint					mMaxBarriers = 0;								///< Max amount of barriers
+	BarrierImpl *			mBarriers = nullptr;							///< List of the actual barriers
+};
+
+JPH_NAMESPACE_END

+ 80 - 0
Jolt/Core/Semaphore.cpp

@@ -0,0 +1,80 @@
+// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
+// SPDX-FileCopyrightText: 2023 Jorrit Rouwe
+// SPDX-License-Identifier: MIT
+
+#include <Jolt/Jolt.h>
+
+#include <Jolt/Core/Semaphore.h>
+
+#ifdef JPH_PLATFORM_WINDOWS
+	JPH_SUPPRESS_WARNING_PUSH
+	JPH_MSVC_SUPPRESS_WARNING(5039) // winbase.h(13179): warning C5039: 'TpSetCallbackCleanupGroup': pointer or reference to potentially throwing function passed to 'extern "C"' function under -EHc. Undefined behavior may occur if this function throws an exception.
+	#define WIN32_LEAN_AND_MEAN
+#ifndef JPH_COMPILER_MINGW
+	#include <Windows.h>
+#else
+	#include <windows.h>
+#endif
+
+	JPH_SUPPRESS_WARNING_POP
+#endif
+
+JPH_NAMESPACE_BEGIN
+
+Semaphore::Semaphore()
+{
+#ifdef JPH_PLATFORM_WINDOWS
+	mSemaphore = CreateSemaphore(nullptr, 0, INT_MAX, nullptr);
+#endif
+}
+
+Semaphore::~Semaphore()
+{
+#ifdef JPH_PLATFORM_WINDOWS
+	CloseHandle(mSemaphore);
+#endif
+}
+
+void Semaphore::Release(uint inNumber)
+{
+	JPH_ASSERT(inNumber > 0);
+
+#ifdef JPH_PLATFORM_WINDOWS
+	int old_value = mCount.fetch_add(inNumber);
+	if (old_value < 0)
+	{
+		int new_value = old_value + (int)inNumber;
+		int num_to_release = min(new_value, 0) - old_value;
+		::ReleaseSemaphore(mSemaphore, num_to_release, nullptr);
+	}
+#else
+	std::lock_guard lock(mLock);
+	mCount += (int)inNumber;
+	if (inNumber > 1)
+		mWaitVariable.notify_all();
+	else
+		mWaitVariable.notify_one();
+#endif
+}
+
+void Semaphore::Acquire(uint inNumber)
+{
+	JPH_ASSERT(inNumber > 0);
+
+#ifdef JPH_PLATFORM_WINDOWS
+	int old_value = mCount.fetch_sub(inNumber);
+	int new_value = old_value - (int)inNumber;
+	if (new_value < 0)
+	{
+		int num_to_acquire = min(old_value, 0) - new_value;
+		for (int i = 0; i < num_to_acquire; ++i)
+			WaitForSingleObject(mSemaphore, INFINITE);
+	}
+#else
+	std::unique_lock lock(mLock);
+	mCount -= (int)inNumber;
+	mWaitVariable.wait(lock, [this]() { return mCount >= 0; });
+#endif
+}
+
+JPH_NAMESPACE_END

+ 51 - 0
Jolt/Core/Semaphore.h

@@ -0,0 +1,51 @@
+// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
+// SPDX-FileCopyrightText: 2023 Jorrit Rouwe
+// SPDX-License-Identifier: MIT
+
+#pragma once
+
+JPH_SUPPRESS_WARNINGS_STD_BEGIN
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+JPH_SUPPRESS_WARNINGS_STD_END
+
+JPH_NAMESPACE_BEGIN
+
+// Things we're using from STL
+using std::atomic;
+using std::mutex;
+using std::condition_variable;
+
+/// Implements a semaphore
+/// When we switch to C++20 we can use counting_semaphore to unify this
+class Semaphore
+{
+public:
+	/// Constructor
+						Semaphore();
+						~Semaphore();
+
+	/// Release the semaphore, signalling the thread waiting on the barrier that there may be work
+	void				Release(uint inNumber = 1);
+
+	/// Acquire the semaphore inNumber times
+	void				Acquire(uint inNumber = 1);
+
+	/// Get the current value of the semaphore
+	inline int			GetValue() const								{ return mCount; }
+
+private:
+#ifdef JPH_PLATFORM_WINDOWS
+	// On windows we use a semaphore object since it is more efficient than a lock and a condition variable
+	alignas(JPH_CACHE_LINE_SIZE) atomic<int> mCount { 0 };				///< We increment mCount for every release, to acquire we decrement the count. If the count is negative we know that we are waiting on the actual semaphore.
+	void *				mSemaphore;										///< The semaphore is an expensive construct so we only acquire/release it if we know that we need to wait/have waiting threads
+#else
+	// Other platforms: Emulate a semaphore using a mutex, condition variable and count
+	mutex				mLock;
+	condition_variable	mWaitVariable;
+	int					mCount = 0;
+#endif
+};
+
+JPH_NAMESPACE_END

+ 4 - 0
Jolt/Jolt.cmake

@@ -34,6 +34,8 @@ set(JOLT_PHYSICS_SRC_FILES
 	${JOLT_PHYSICS_ROOT}/Core/JobSystem.inl
 	${JOLT_PHYSICS_ROOT}/Core/JobSystemThreadPool.cpp
 	${JOLT_PHYSICS_ROOT}/Core/JobSystemThreadPool.h
+	${JOLT_PHYSICS_ROOT}/Core/JobSystemWithBarrier.cpp
+	${JOLT_PHYSICS_ROOT}/Core/JobSystemWithBarrier.h
 	${JOLT_PHYSICS_ROOT}/Core/LinearCurve.cpp
 	${JOLT_PHYSICS_ROOT}/Core/LinearCurve.h
 	${JOLT_PHYSICS_ROOT}/Core/LockFreeHashMap.h
@@ -51,6 +53,8 @@ set(JOLT_PHYSICS_SRC_FILES
 	${JOLT_PHYSICS_ROOT}/Core/Result.h
 	${JOLT_PHYSICS_ROOT}/Core/RTTI.cpp
 	${JOLT_PHYSICS_ROOT}/Core/RTTI.h
+	${JOLT_PHYSICS_ROOT}/Core/Semaphore.cpp
+	${JOLT_PHYSICS_ROOT}/Core/Semaphore.h
 	${JOLT_PHYSICS_ROOT}/Core/StaticArray.h
 	${JOLT_PHYSICS_ROOT}/Core/StreamIn.h
 	${JOLT_PHYSICS_ROOT}/Core/StreamOut.h