Browse Source

Move more of `gb.h`'s Synchronization code into common.cpp

gingerBill 4 years ago
parent
commit
5c4d95d539
4 changed files with 120 additions and 50 deletions
  1. 10 10
      src/checker.cpp
  2. 2 2
      src/checker.hpp
  3. 93 23
      src/common.cpp
  4. 15 15
      src/thread_pool.cpp

+ 10 - 10
src/checker.cpp

@@ -883,7 +883,7 @@ void init_checker_info(CheckerInfo *i) {
 	mutex_init(&i->identifier_uses_mutex);
 	mutex_init(&i->foreign_mutex);
 
-	gb_semaphore_init(&i->collect_semaphore);
+	semaphore_init(&i->collect_semaphore);
 
 
 #undef TIME_SECTION
@@ -976,7 +976,7 @@ void init_checker(Checker *c) {
 
 	// NOTE(bill): 1 Mi elements should be enough on average
 	mpmc_init(&c->procs_to_check_queue, heap_allocator(), 1<<20);
-	gb_semaphore_init(&c->procs_to_check_semaphore);
+	semaphore_init(&c->procs_to_check_semaphore);
 
 	mpmc_init(&c->global_untyped_queue, a, 1<<20);
 
@@ -991,7 +991,7 @@ void destroy_checker(Checker *c) {
 	destroy_checker_context(&c->builtin_ctx);
 
 	mpmc_destroy(&c->procs_to_check_queue);
-	gb_semaphore_destroy(&c->procs_to_check_semaphore);
+	semaphore_destroy(&c->procs_to_check_semaphore);
 
 	mpmc_destroy(&c->global_untyped_queue);
 }
@@ -4136,7 +4136,7 @@ void check_with_workers(Checker *c, gbThreadProc *proc, isize total_count) {
 		worker_count = 0;
 	}
 
-	gb_semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count);
+	semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count);
 
 	if (worker_count == 0) {
 		ThreadProcCheckerSection section_all = {};
@@ -4174,7 +4174,7 @@ void check_with_workers(Checker *c, gbThreadProc *proc, isize total_count) {
 	dummy_main_thread.user_data = thread_data+worker_count;
 	proc(&dummy_main_thread);
 
-	gb_semaphore_wait(&c->info.collect_semaphore);
+	semaphore_wait(&c->info.collect_semaphore);
 
 	for (isize i = 0; i < worker_count; i++) {
 		gb_thread_destroy(threads+i);
@@ -4208,7 +4208,7 @@ GB_THREAD_PROC(thread_proc_collect_entities) {
 
 	map_destroy(&untyped);
 
-	gb_semaphore_release(&c->info.collect_semaphore);
+	semaphore_release(&c->info.collect_semaphore);
 	return 0;
 }
 
@@ -4249,7 +4249,7 @@ GB_THREAD_PROC(thread_proc_check_export_entities) {
 
 	map_destroy(&untyped);
 
-	gb_semaphore_release(&c->info.collect_semaphore);
+	semaphore_release(&c->info.collect_semaphore);
 	return 0;
 }
 
@@ -4735,7 +4735,7 @@ GB_THREAD_PROC(thread_proc_body) {
 
 	map_destroy(&untyped);
 
-	gb_semaphore_release(&c->procs_to_check_semaphore);
+	semaphore_release(&c->procs_to_check_semaphore);
 
 	return 0;
 }
@@ -4795,7 +4795,7 @@ void check_procedure_bodies(Checker *c) {
 	GB_ASSERT(total_queued == original_queue_count);
 
 
-	gb_semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count);
+	semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count);
 
 	gbThread *threads = gb_alloc_array(permanent_allocator(), gbThread, worker_count);
 	for (isize i = 0; i < worker_count; i++) {
@@ -4809,7 +4809,7 @@ void check_procedure_bodies(Checker *c) {
 	dummy_main_thread.user_data = thread_data+worker_count;
 	thread_proc_body(&dummy_main_thread);
 
-	gb_semaphore_wait(&c->procs_to_check_semaphore);
+	semaphore_wait(&c->procs_to_check_semaphore);
 
 	for (isize i = 0; i < worker_count; i++) {
 		gb_thread_destroy(threads+i);

+ 2 - 2
src/checker.hpp

@@ -294,7 +294,7 @@ struct CheckerInfo {
 	// NOTE(bill): If the semantic checker (check_proc_body) is to ever to be multithreaded,
 	// these variables will be of contention
 
-	gbSemaphore collect_semaphore;
+	Semaphore collect_semaphore;
 
 	UntypedExprInfoMap global_untyped; // NOTE(bill): This needs to be a map and not on the Ast
 	                                   // as it needs to be iterated across afterwards
@@ -390,7 +390,7 @@ struct Checker {
 	MPMCQueue<Entity *> procs_with_deferred_to_check;
 
 	ProcBodyQueue procs_to_check_queue;
-	gbSemaphore procs_to_check_semaphore;
+	Semaphore procs_to_check_semaphore;
 
 	// TODO(bill): Technically MPSC queue
 	MPMCQueue<UntypedExprInfo> global_untyped_queue;

+ 93 - 23
src/common.cpp

@@ -47,6 +47,47 @@
 	void mutex_unlock(BlockingMutex *m) {
 		ReleaseSRWLockExclusive(&m->srwlock);
 	}
+
+	struct RecursiveMutex {
+		CRITICAL_SECTION win32_critical_section;
+	};
+	void mutex_init(RecursiveMutex *m) {
+		InitializeCriticalSection(&m->win32_critical_section);
+	}
+	void mutex_destroy(RecursiveMutex *m) {
+		DeleteCriticalSection(&m->win32_critical_section);
+	}
+	void mutex_lock(RecursiveMutex *m) {
+		EnterCriticalSection(&m->win32_critical_section);
+	}
+	bool mutex_try_lock(RecursiveMutex *m) {
+		return TryEnterCriticalSection(&m->win32_critical_section) != 0;
+	}
+	void mutex_unlock(RecursiveMutex *m) {
+		LeaveCriticalSection(&m->win32_critical_section);
+	}
+
+	struct Semaphore {
+		void *win32_handle;
+	};
+
+	gb_inline void semaphore_init(Semaphore *s) {
+		s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL);
+	}
+	gb_inline void semaphore_destroy(Semaphore *s) {
+		CloseHandle(s->win32_handle);
+	}
+	gb_inline void semaphore_post(Semaphore *s, i32 count) {
+		ReleaseSemaphore(s->win32_handle, count, NULL);
+	}
+	gb_inline void semaphore_wait(Semaphore *s) {
+		WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE);
+	}
+
+	gb_inline void semaphore_release(Semaphore *s) {
+		semaphore_post(s, 1);
+	}
+
 #else
 	struct BlockingMutex {
 		pthread_mutex_t pthread_mutex;
@@ -66,26 +107,55 @@
 	void mutex_unlock(BlockingMutex *m) {
 		pthread_mutex_unlock(&m->pthread_mutex);
 	}
-#endif
 
-struct RecursiveMutex {
-	gbMutex mutex;
-};
-void mutex_init(RecursiveMutex *m) {
-	gb_mutex_init(&m->mutex);
-}
-void mutex_destroy(RecursiveMutex *m) {
-	gb_mutex_destroy(&m->mutex);
-}
-void mutex_lock(RecursiveMutex *m) {
-	gb_mutex_lock(&m->mutex);
-}
-bool mutex_try_lock(RecursiveMutex *m) {
-	return !!gb_mutex_try_lock(&m->mutex);
-}
-void mutex_unlock(RecursiveMutex *m) {
-	gb_mutex_unlock(&m->mutex);
-}
+	struct RecursiveMutex {
+		pthread_mutex_t pthread_mutex;
+		pthread_mutexattr_t pthread_mutexattr;
+	};
+	void mutex_init(RecursiveMutex *m) {
+		pthread_mutexattr_init(&m->pthread_mutexattr);
+		pthread_mutexattr_settype(&m->pthread_mutexattr, PTHREAD_MUTEX_RECURSIVE);
+		pthread_mutex_init(&m->pthread_mutex, &m->pthread_mutexattr);
+	}
+	void mutex_destroy(RecursiveMutex *m) {
+		pthread_mutex_destroy(&m->pthread_mutex);
+	}
+	void mutex_lock(RecursiveMutex *m) {
+		pthread_mutex_lock(&m->pthread_mutex);
+	}
+	bool mutex_try_lock(RecursiveMutex *m) {
+		return pthread_mutex_trylock(&m->pthread_mutex) == 0;
+	}
+	void mutex_unlock(RecursiveMutex *m) {
+		pthread_mutex_unlock(&m->pthread_mutex);
+	}
+
+	#if defined(GB_SYSTEM_OSX)
+		struct Semaphore {
+			semaphore_t osx_handle;
+		};
+
+		gb_inline void semaphore_init   (Semaphore *s)            { semaphore_create(mach_task_self(), &s->osx_handle, SYNC_POLICY_FIFO, 0); }
+		gb_inline void semaphore_destroy(Semaphore *s)            { semaphore_destroy(mach_task_self(), s->osx_handle); }
+		gb_inline void semaphore_post   (Semaphore *s, i32 count) { while (count --> 0) semaphore_signal(s->osx_handle); }
+		gb_inline void semaphore_wait   (Semaphore *s)            { semaphore_wait(s->osx_handle); }
+	#elif defined(GB_SYSTEM_UNIX)
+		struct Semaphore {
+			sem_t unix_handle;
+		};
+
+		gb_inline void semaphore_init   (Semaphore *s)            { sem_init(&s->unix_handle, 0, 0); }
+		gb_inline void semaphore_destroy(Semaphore *s)            { sem_destroy(&s->unix_handle); }
+		gb_inline void semaphore_post   (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); }
+		gb_inline void semaphore_wait   (Semaphore *s)            { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); }
+	#else
+	#error
+	#endif
+
+	gb_inline void semaphore_release(Semaphore *s) {
+		semaphore_post(s, 1);
+	}
+#endif
 
 
 
@@ -585,7 +655,7 @@ struct Temp_Allocator {
 	isize curr_offset;
 	gbAllocator backup_allocator;
 	Array<void *> leaked_allocations;
-	gbMutex mutex;
+	BlockingMutex mutex;
 };
 
 gb_global Temp_Allocator temporary_allocator_data = {};
@@ -596,7 +666,7 @@ void temp_allocator_init(Temp_Allocator *s, isize size) {
 	s->len = size;
 	s->curr_offset = 0;
 	s->leaked_allocations.allocator = s->backup_allocator;
-	gb_mutex_init(&s->mutex);
+	mutex_init(&s->mutex);
 }
 
 void *temp_allocator_alloc(Temp_Allocator *s, isize size, isize alignment) {
@@ -639,8 +709,8 @@ GB_ALLOCATOR_PROC(temp_allocator_proc) {
 	Temp_Allocator *s = cast(Temp_Allocator *)allocator_data;
 	GB_ASSERT_NOT_NULL(s);
 
-	gb_mutex_lock(&s->mutex);
-	defer (gb_mutex_unlock(&s->mutex));
+	mutex_lock(&s->mutex);
+	defer (mutex_unlock(&s->mutex));
 
 	switch (type) {
 	case gbAllocation_Alloc:

+ 15 - 15
src/thread_pool.cpp

@@ -11,10 +11,10 @@ struct WorkerTask {
 
 
 struct ThreadPool {
-	BlockingMutex mutex;
-	gbSemaphore   sem_available;
-	gbAtomic32    processing_work_count;
-	bool          is_running;
+	BlockingMutex    mutex;
+	Semaphore        sem_available;
+	std::atomic<i32> processing_work_count;
+	bool             is_running;
 
 	gbAllocator allocator;
 
@@ -40,7 +40,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count
 	pool->thread_count = gb_max(thread_count, 0);
 	pool->threads = gb_alloc_array(a, gbThread, pool->thread_count);
 	mutex_init(&pool->mutex);
-	gb_semaphore_init(&pool->sem_available);
+	semaphore_init(&pool->sem_available);
 	pool->is_running = true;
 
 	pool->worker_prefix_len = 0;
@@ -76,7 +76,7 @@ void thread_pool_start(ThreadPool *pool) {
 void thread_pool_join(ThreadPool *pool) {
 	pool->is_running = false;
 
-	gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count);
+	semaphore_post(&pool->sem_available, cast(i32)pool->thread_count);
 
 	gb_yield();
 
@@ -90,7 +90,7 @@ void thread_pool_join(ThreadPool *pool) {
 void thread_pool_destroy(ThreadPool *pool) {
 	thread_pool_join(pool);
 
-	gb_semaphore_destroy(&pool->sem_available);
+	semaphore_destroy(&pool->sem_available);
 	mutex_destroy(&pool->mutex);
 	gb_free(pool->allocator, pool->threads);
 	pool->thread_count = 0;
@@ -106,21 +106,21 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
 	task.data = data;
 
 	mpmc_enqueue(&pool->tasks, task);
-	gb_semaphore_post(&pool->sem_available, 1);
+	semaphore_post(&pool->sem_available, 1);
 	mutex_unlock(&pool->mutex);
 }
 
 bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) {
 	bool got_task = false;
 	if (mpmc_dequeue(&pool->tasks, task)) {
-		gb_atomic32_fetch_add(&pool->processing_work_count, +1);
+		pool->processing_work_count.fetch_add(1);
 		got_task = true;
 	}
 	return got_task;
 }
 void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) {
 	task->result = task->do_work(task->data);
-	gb_atomic32_fetch_add(&pool->processing_work_count, -1);
+	pool->processing_work_count.fetch_sub(1);
 }
 
 void thread_pool_wait_to_process(ThreadPool *pool) {
@@ -131,16 +131,16 @@ void thread_pool_wait_to_process(ThreadPool *pool) {
 		}
 		return;
 	}
-	while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) {
+	while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || pool->processing_work_count.load() != 0) {
 		WorkerTask task = {};
 		if (thread_pool_try_and_pop_task(pool, &task)) {
 			thread_pool_do_work(pool, &task);
 		}
 
 		// Safety-kick
-		while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) {
+		while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && pool->processing_work_count.load() == 0) {
 			mutex_lock(&pool->mutex);
-			gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed));
+			semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed));
 			mutex_unlock(&pool->mutex);
 		}
 
@@ -154,7 +154,7 @@ void thread_pool_wait_to_process(ThreadPool *pool) {
 GB_THREAD_PROC(worker_thread_internal) {
 	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
 	while (pool->is_running) {
-		gb_semaphore_wait(&pool->sem_available);
+		semaphore_wait(&pool->sem_available);
 
 		WorkerTask task = {};
 		if (thread_pool_try_and_pop_task(pool, &task)) {
@@ -162,7 +162,7 @@ GB_THREAD_PROC(worker_thread_internal) {
 		}
 	}
 	// Cascade
-	gb_semaphore_release(&pool->sem_available);
+	semaphore_release(&pool->sem_available);
 
 	return 0;
 }