Browse Source

move to work-stealing threadpool

Colin Davidson 2 years ago
parent
commit
5f27f2dd7f
4 changed files with 295 additions and 163 deletions
  1. 1 1
      build_odin.sh
  2. 3 2
      src/common.cpp
  3. 143 109
      src/thread_pool.cpp
  4. 148 51
      src/threading.cpp

+ 1 - 1
build_odin.sh

@@ -50,7 +50,7 @@ config_darwin() {
 		panic "Requirement: llvm-config must be base version smaller than 15"
 		panic "Requirement: llvm-config must be base version smaller than 15"
 	fi
 	fi
 
 
-	LDFLAGS="$LDFLAGS -liconv -ldl"
+	LDFLAGS="$LDFLAGS -liconv -ldl -framework System"
 	CXXFLAGS="$CXXFLAGS $($LLVM_CONFIG --cxxflags --ldflags)"
 	CXXFLAGS="$CXXFLAGS $($LLVM_CONFIG --cxxflags --ldflags)"
 	LDFLAGS="$LDFLAGS -lLLVM-C"
 	LDFLAGS="$LDFLAGS -lLLVM-C"
 }
 }

+ 3 - 2
src/common.cpp

@@ -31,7 +31,8 @@
 
 
 gb_internal gbAllocator heap_allocator(void);
 gb_internal gbAllocator heap_allocator(void);
 
 
-#define for_array(index_, array_) for (isize index_ = 0; index_ < (array_).count; index_++)
+#define for_array_off(index_, off_, array_) for (isize index_ = off_; index_ < (array_).count; index_++)
+#define for_array(index_, array_) for_array_off(index_, 0, array_)
 
 
 gb_internal i32 next_pow2(i32 n);
 gb_internal i32 next_pow2(i32 n);
 gb_internal i64 next_pow2(i64 n);
 gb_internal i64 next_pow2(i64 n);
@@ -908,4 +909,4 @@ gb_internal Slice<DistanceAndTarget> did_you_mean_results(DidYouMeanAnswers *d)
 
 
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
 	#pragma warning(pop)
 	#pragma warning(pop)
-#endif
+#endif

+ 143 - 109
src/thread_pool.cpp

@@ -3,164 +3,198 @@
 struct WorkerTask;
 struct WorkerTask;
 struct ThreadPool;
 struct ThreadPool;
 
 
-#define WORKER_TASK_PROC(name) isize name(void *data)
-typedef WORKER_TASK_PROC(WorkerTaskProc);
+gb_thread_local Thread *current_thread;
 
 
 gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name);
 gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name);
 gb_internal void thread_pool_destroy(ThreadPool *pool);
 gb_internal void thread_pool_destroy(ThreadPool *pool);
 gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
 gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
 gb_internal void thread_pool_wait(ThreadPool *pool);
 gb_internal void thread_pool_wait(ThreadPool *pool);
 
 
-
-struct WorkerTask {
-	WorkerTask *    next;
-	WorkerTaskProc *do_work;
-	void *          data;
-	ThreadPool *    pool;
-};
-
-
 struct ThreadPool {
 struct ThreadPool {
 	gbAllocator   allocator;
 	gbAllocator   allocator;
-	BlockingMutex mutex;
-	Condition     task_cond;
-	
+
 	Slice<Thread> threads;
 	Slice<Thread> threads;
-	
-	WorkerTask *task_queue;
-	
-	std::atomic<isize> ready;
-	std::atomic<bool>  stop;
-	
-};
+	std::atomic<bool> running;
 
 
-gb_internal THREAD_PROC(thread_pool_thread_proc);
+	BlockingMutex task_lock;
+	Condition     tasks_available;
+
+	Futex tasks_left;
+};
 
 
 gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) {
 gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) {
+	mutex_init(&pool->task_lock);
+	condition_init(&pool->tasks_available);
+
 	pool->allocator = a;
 	pool->allocator = a;
-	pool->stop = false;
-	mutex_init(&pool->mutex);
-	condition_init(&pool->task_cond);
-	
-	slice_init(&pool->threads, a, thread_count);
-	for_array(i, pool->threads) {
+	slice_init(&pool->threads, a, thread_count + 1);
+
+	// setup the main thread
+	thread_init(pool, &pool->threads[0], 0);
+	current_thread = &pool->threads[0];
+
+	for_array_off(i, 1, pool->threads) {
 		Thread *t = &pool->threads[i];
 		Thread *t = &pool->threads[i];
-		thread_init_and_start(t, thread_pool_thread_proc, pool);
+		thread_init_and_start(pool, t, i);
 	}
 	}
+
+	pool->running = true;
 }
 }
 
 
 gb_internal void thread_pool_destroy(ThreadPool *pool) {
 gb_internal void thread_pool_destroy(ThreadPool *pool) {
-	mutex_lock(&pool->mutex);
-	pool->stop = true;
-	condition_broadcast(&pool->task_cond);
-	mutex_unlock(&pool->mutex);
+	pool->running = false;
 
 
-	for_array(i, pool->threads) {
+	for_array_off(i, 1, pool->threads) {
 		Thread *t = &pool->threads[i];
 		Thread *t = &pool->threads[i];
+		condition_broadcast(&pool->tasks_available);
 		thread_join_and_destroy(t);
 		thread_join_and_destroy(t);
 	}
 	}
+	for_array(i, pool->threads) {
+		free(pool->threads[i].queue);
+	}
 
 
 	gb_free(pool->allocator, pool->threads.data);
 	gb_free(pool->allocator, pool->threads.data);
-	mutex_destroy(&pool->mutex);
-	condition_destroy(&pool->task_cond);
+	mutex_destroy(&pool->task_lock);
+	condition_destroy(&pool->tasks_available);
 }
 }
 
 
-gb_internal bool thread_pool_queue_empty(ThreadPool *pool) {
-	return pool->task_queue == nullptr;
-}
+void thread_pool_queue_push(Thread *thread, WorkerTask task) {
+	uint64_t capture;
+	uint64_t new_capture;
+	do {
+		capture = thread->head_and_tail.load();
+
+		uint64_t mask = thread->capacity - 1;
+		uint64_t head = (capture >> 32) & mask;
+		uint64_t tail = ((uint32_t)capture) & mask;
+
+		uint64_t new_head = (head + 1) & mask;
+		if (new_head == tail) {
+			GB_PANIC("Thread Queue Full!\n");
+		}
+
+		// This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning
+		thread->queue[head] = task;
+		new_capture = (new_head << 32) | tail;
+	} while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture));
 
 
-gb_internal WorkerTask *thread_pool_queue_pop(ThreadPool *pool) {
-	GB_ASSERT(pool->task_queue != nullptr);
-	WorkerTask *task = pool->task_queue;
-	pool->task_queue = task->next;
-	return task;
+	thread->pool->tasks_left.fetch_add(1);
+	condition_broadcast(&thread->pool->tasks_available);
 }
 }
-gb_internal void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) {
-	GB_ASSERT(task != nullptr);
-	task->next = pool->task_queue;
-	pool->task_queue = task;
+
+bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) {
+	uint64_t capture;
+	uint64_t new_capture;
+	do {
+		capture = thread->head_and_tail.load();
+
+		uint64_t mask = thread->capacity - 1;
+		uint64_t head = (capture >> 32) & mask;
+		uint64_t tail = ((uint32_t)capture) & mask;
+
+		uint64_t new_tail = (tail + 1) & mask;
+		if (tail == head) {
+			return false;
+		}
+
+		// Making a copy of the task before we increment the tail, avoiding the same potential race condition as above
+		*task = thread->queue[tail];
+
+		new_capture = (head << 32) | new_tail;
+	} while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture));
+
+	return true;
 }
 }
 
 
 gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
 gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
-	GB_ASSERT(proc != nullptr);
-	WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask);
-	if (task == nullptr) {
-		GB_PANIC("Out of memory");
-		return false;
-	}
-	task->pool = pool;
-	task->do_work = proc;
-	task->data = data;
+	WorkerTask task = {};
+	task.do_work = proc;
+	task.data = data;
 		
 		
-	mutex_lock(&pool->mutex);
-	thread_pool_queue_push(pool, task);
-	GB_ASSERT(pool->ready >= 0);
-	pool->ready.fetch_add(1);
-	condition_broadcast(&pool->task_cond);
-	mutex_unlock(&pool->mutex);
+	thread_pool_queue_push(current_thread, task);
 	return true;
 	return true;
 }	
 }	
 
 
+gb_internal void thread_pool_wait(ThreadPool *pool) {
+	WorkerTask task;
 
 
-gb_internal void thread_pool_do_task(WorkerTask *task) {
-	task->do_work(task->data);
-}
+	while (pool->tasks_left) {
 
 
-gb_internal void thread_pool_wait(ThreadPool *pool) {
-	if (pool->threads.count == 0) {
-		while (!thread_pool_queue_empty(pool)) {
-			thread_pool_do_task(thread_pool_queue_pop(pool));
-			pool->ready.fetch_sub(1);
+		// if we've got tasks on our queue, run them
+		while (thread_pool_queue_pop(current_thread, &task)) {
+			task.do_work(task.data);
+			pool->tasks_left.fetch_sub(1);
 		}
 		}
-		GB_ASSERT(pool->ready == 0);
-		return;
-	}
-	for (;;) {
-		mutex_lock(&pool->mutex);
 
 
-		while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) {
-			condition_wait(&pool->task_cond, &pool->mutex);
-		}
-		if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) {
-			mutex_unlock(&pool->mutex);
-			return;
-		}
 
 
-		WorkerTask *task = thread_pool_queue_pop(pool);
-		mutex_unlock(&pool->mutex);
-	
-		thread_pool_do_task(task);
-		if (--pool->ready == 0) {
-			mutex_lock(&pool->mutex);
-			condition_broadcast(&pool->task_cond);
-			mutex_unlock(&pool->mutex);
+		// is this mem-barriered enough?
+		// This *must* be executed in this order, so the futex wakes immediately
+		// if rem_tasks has changed since we checked last, otherwise the program
+		// will permanently sleep
+		Footex rem_tasks = pool->tasks_left.load();
+		if (!rem_tasks) {
+			break;
 		}
 		}
+
+		tpool_wait_on_addr(&pool->tasks_left, rem_tasks);
 	}
 	}
 }
 }
 
 
-
 gb_internal THREAD_PROC(thread_pool_thread_proc) {
 gb_internal THREAD_PROC(thread_pool_thread_proc) {
-	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
-	
+	WorkerTask task;
+	current_thread = thread;
+	ThreadPool *pool = current_thread->pool;
+
 	for (;;) {
 	for (;;) {
-		mutex_lock(&pool->mutex);
+work_start:
+		if (!pool->running) {
+			break;
+		}
 
 
-		while (!pool->stop && thread_pool_queue_empty(pool)) {
-			condition_wait(&pool->task_cond, &pool->mutex);
+		// If we've got tasks to process, work through them
+		size_t finished_tasks = 0;
+		while (thread_pool_queue_pop(current_thread, &task)) {
+			task.do_work(task.data);
+			pool->tasks_left.fetch_sub(1);
+
+			finished_tasks += 1;
 		}
 		}
-		if (pool->stop && thread_pool_queue_empty(pool)) {
-			mutex_unlock(&pool->mutex);
-			return 0;
+		if (finished_tasks > 0 && !pool->tasks_left) {
+			tpool_wake_addr(&pool->tasks_left);
 		}
 		}
 
 
-		WorkerTask *task = thread_pool_queue_pop(pool);
-		mutex_unlock(&pool->mutex);
-	
-		thread_pool_do_task(task);
-		if (--pool->ready == 0) {
-			mutex_lock(&pool->mutex);
-			condition_broadcast(&pool->task_cond);
-			mutex_unlock(&pool->mutex);
+		// If there's still work somewhere and we don't have it, steal it
+		if (pool->tasks_left) {
+			isize idx = current_thread->idx;
+			for_array(i, pool->threads) {
+				if (!pool->tasks_left) {
+					break;
+				}
+
+				idx = (idx + 1) % pool->threads.count;
+				Thread *thread = &pool->threads[idx];
+
+				WorkerTask task;
+				if (!thread_pool_queue_pop(thread, &task)) {
+					continue;
+				}
+
+				task.do_work(task.data);
+				pool->tasks_left.fetch_sub(1);
+
+				if (!pool->tasks_left) {
+					tpool_wake_addr(&pool->tasks_left);
+				}
+
+				goto work_start;
+			}
 		}
 		}
+
+		// if we've done all our work, and there's nothing to steal, go to sleep
+		mutex_lock(&pool->task_lock);
+		condition_wait(&pool->tasks_available, &pool->task_lock);
+		mutex_unlock(&pool->task_lock);
 	}
 	}
-}
+
+	return 0;
+}

+ 148 - 51
src/threading.cpp

@@ -11,24 +11,34 @@ struct RecursiveMutex;
 struct Semaphore;
 struct Semaphore;
 struct Condition;
 struct Condition;
 struct Thread;
 struct Thread;
+struct ThreadPool;
 
 
 #define THREAD_PROC(name) isize name(struct Thread *thread)
 #define THREAD_PROC(name) isize name(struct Thread *thread)
-typedef THREAD_PROC(ThreadProc);
+gb_internal THREAD_PROC(thread_pool_thread_proc);
+
+#define WORKER_TASK_PROC(name) isize name(void *data)
+typedef WORKER_TASK_PROC(WorkerTaskProc);
+
+typedef struct WorkerTask {
+	WorkerTaskProc *do_work;
+	void           *data;
+} WorkerTask;
 
 
 struct Thread {
 struct Thread {
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
-	void *        win32_handle;
+	void *win32_handle;
 #else
 #else
-	pthread_t     posix_handle;
+	pthread_t posix_handle;
 #endif
 #endif
+	
+	isize idx;
 
 
-	ThreadProc * proc;
-	void *         user_data;
-	isize          user_index;
-	isize volatile return_value;
+	WorkerTask *queue;
+	size_t capacity;
+	std::atomic<uint64_t> head_and_tail;
 
 
-	isize       stack_size;
-	std::atomic<bool> is_running;
+	isize  stack_size;
+	struct ThreadPool *pool;
 };
 };
 
 
 
 
@@ -59,10 +69,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32
 
 
 gb_internal u32  thread_current_id(void);
 gb_internal u32  thread_current_id(void);
 
 
-gb_internal void thread_init_and_start           (Thread *t, ThreadProc *proc, void *data);
-gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size);
+gb_internal void thread_init                     (ThreadPool *pool, Thread *t, isize idx);
+gb_internal void thread_init_and_start           (ThreadPool *pool, Thread *t, isize idx);
 gb_internal void thread_join_and_destroy(Thread *t);
 gb_internal void thread_join_and_destroy(Thread *t);
-gb_internal bool thread_is_running      (Thread const *t);
 gb_internal void thread_set_name        (Thread *t, char const *name);
 gb_internal void thread_set_name        (Thread *t, char const *name);
 
 
 gb_internal void yield_thread(void);
 gb_internal void yield_thread(void);
@@ -325,47 +334,45 @@ gb_internal gb_inline void yield(void) {
 #endif
 #endif
 }
 }
 
 
-gb_internal void private__thread_run(Thread *t) {
-	t->return_value = t->proc(t);
-}
-
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
-	gb_internal DWORD __stdcall internal_thread_proc(void *arg) {
-		Thread *t = cast(Thread *)arg;
-		t->is_running.store(true);
-		private__thread_run(t);
-		return 0;
-	}
+gb_internal DWORD __stdcall internal_thread_proc(void *arg) {
+	Thread *t = cast(Thread *)arg;
+	thread_pool_thread_proc(t);
+	return 0;
+}
 #else
 #else
-	gb_internal void *internal_thread_proc(void *arg) {
-	#if (GB_SYSTEM_LINUX)
-		// NOTE: Don't permit any signal delivery to threads on Linux.
-		sigset_t mask = {};
-		sigfillset(&mask);
-		GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals");
-	#endif
-		
-		Thread *t = cast(Thread *)arg;
-		t->is_running.store(true);
-		private__thread_run(t);
-		return NULL;
-	}
+gb_internal void *internal_thread_proc(void *arg) {
+#if (GB_SYSTEM_LINUX)
+	// NOTE: Don't permit any signal delivery to threads on Linux.
+	sigset_t mask = {};
+	sigfillset(&mask);
+	GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals");
+#endif
+	
+	Thread *t = cast(Thread *)arg;
+	thread_pool_thread_proc(t);
+	return NULL;
+}
 #endif
 #endif
 
 
-gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); }
-
-gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) {
+gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) {
 	gb_zero_item(t);
 	gb_zero_item(t);
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
 	t->win32_handle = INVALID_HANDLE_VALUE;
 	t->win32_handle = INVALID_HANDLE_VALUE;
 #else
 #else
 	t->posix_handle = 0;
 	t->posix_handle = 0;
 #endif
 #endif
-	GB_ASSERT(!t->is_running.load());
-	GB_ASSERT(proc != NULL);
-	t->proc = proc;
-	t->user_data = user_data;
-	t->stack_size = stack_size;
+
+	t->capacity = 1 << 14; // must be a power of 2
+	t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity);
+	t->head_and_tail = 0;
+	t->pool = pool;
+	t->idx = idx;
+}
+
+gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) {
+	thread_init(pool, t, idx);
+	isize stack_size = 0;
 
 
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
 	t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL);
 	t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL);
@@ -385,10 +392,6 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v
 }
 }
 
 
 gb_internal void thread_join_and_destroy(Thread *t) {
 gb_internal void thread_join_and_destroy(Thread *t) {
-	if (!t->is_running.load()) {
-		return;
-	}
-
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
 	WaitForSingleObject(t->win32_handle, INFINITE);
 	WaitForSingleObject(t->win32_handle, INFINITE);
 	CloseHandle(t->win32_handle);
 	CloseHandle(t->win32_handle);
@@ -397,11 +400,8 @@ gb_internal void thread_join_and_destroy(Thread *t) {
 	pthread_join(t->posix_handle, NULL);
 	pthread_join(t->posix_handle, NULL);
 	t->posix_handle = 0;
 	t->posix_handle = 0;
 #endif
 #endif
-	t->is_running.store(false);
 }
 }
 
 
-gb_internal bool thread_is_running(Thread const *t) { return t->is_running.load(); }
-
 gb_internal void thread_set_name(Thread *t, char const *name) {
 gb_internal void thread_set_name(Thread *t, char const *name) {
 #if defined(GB_COMPILER_MSVC)
 #if defined(GB_COMPILER_MSVC)
 	#pragma pack(push, 8)
 	#pragma pack(push, 8)
@@ -437,7 +437,104 @@ gb_internal void thread_set_name(Thread *t, char const *name) {
 #endif
 #endif
 }
 }
 
 
+#if defined(GB_SYSTEM_LINUX)
+#include <linux/futex.h>
+#include <sys/syscall.h>
+
+typedef std::atomic<int32_t> Futex;
+typedef volatile int32_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+	for (;;) {
+		int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0);
+		if (ret == -1) {
+			perror("Futex wake");
+			GB_PANIC("Failed in futex wake!\n");
+		} else if (ret > 0) {
+			return;
+		}
+	}
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+	for (;;) {
+		int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0);
+		if (ret == -1) {
+			if (errno != EAGAIN) {
+				perror("Futex wait");
+				GB_PANIC("Failed in futex wait!\n");
+			} else {
+				return;
+			}
+		} else if (ret == 0) {
+			if (*addr != val) {
+				return;
+			}
+		}
+	}
+}
+#elif defined(GB_SYSTEM_OSX)
+
+typedef std::atomic<int64_t> Futex;
+typedef volatile int64_t Footex;
+
+#define UL_COMPARE_AND_WAIT	0x00000001
+#define ULF_NO_ERRNO        0x01000000
+
+int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */
+int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+	for (;;) {
+		int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0);
+		if (ret >= 0) {
+			return;
+		}
+		if (ret == EINTR || ret == EFAULT) {
+			continue;
+		}
+		if (ret == ENOENT) {
+			return;
+		}
+		GB_PANIC("Failed in futex wake!\n");
+	}
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+	for (;;) {
+		int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0);
+		if (ret >= 0) {
+			if (*addr != val) {
+				return;
+			}
+			continue;
+		}
+		if (ret == EINTR || ret == EFAULT) {
+			continue;
+		}
+		if (ret == ENOENT) {
+			return;
+		}
+
+		GB_PANIC("Failed in futex wait!\n");
+	}
+}
+#elif defined(GB_SYSTEM_WINDOWS)
+typedef std::atomic<int64_t> Futex;
+typedef volatile int64_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+	WakeByAddressSingle((void *)addr);
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+	for (;;) {
+		int ret = WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE);
+		if (*addr != val) break;
+	}
+}
+#endif
 
 
 #if defined(GB_SYSTEM_WINDOWS)
 #if defined(GB_SYSTEM_WINDOWS)
 	#pragma warning(pop)
 	#pragma warning(pop)
-#endif
+#endif