Răsfoiți Sursa

Implement `MPMCQueue` for procedure body checking

This is preparation for basic multithreading in the semantic checker
gingerBill 4 ani în urmă
părinte
comite
d8abe7fc4d
4 a modificat fișierele cu 125 adăugiri și 27 ștergeri
  1. 13 24
      src/checker.cpp
  2. 2 3
      src/checker.hpp
  3. 1 0
      src/common.cpp
  4. 109 0
      src/queue.cpp

+ 13 - 24
src/checker.cpp

@@ -945,7 +945,7 @@ bool init_checker(Checker *c, Parser *parser) {
 	init_checker_info(&c->info);
 	c->info.checker = c;
 
-	array_init(&c->procs_to_check, a);
+	gb_mutex_init(&c->procs_with_deferred_to_check_mutex);
 	array_init(&c->procs_with_deferred_to_check, a);
 
 	// NOTE(bill): Is this big enough or too small?
@@ -955,21 +955,20 @@ bool init_checker(Checker *c, Parser *parser) {
 
 	c->builtin_ctx = make_checker_context(c);
 
-	gb_mutex_init(&c->procs_to_check_mutex);
-	gb_mutex_init(&c->procs_with_deferred_to_check_mutex);
+	// NOTE(bill): 1 Mi elements should be enough on average
+	mpmc_init(&c->procs_to_check_queue, heap_allocator(), 1<<20);
 	return true;
 }
 
 void destroy_checker(Checker *c) {
 	destroy_checker_info(&c->info);
 
-	array_free(&c->procs_to_check);
+	gb_mutex_destroy(&c->procs_with_deferred_to_check_mutex);
 	array_free(&c->procs_with_deferred_to_check);
 
 	destroy_checker_context(&c->builtin_ctx);
 
-	gb_mutex_destroy(&c->procs_to_check_mutex);
-	gb_mutex_destroy(&c->procs_with_deferred_to_check_mutex);
+	// mpmc_destroy(&c->procs_to_check_queue);
 }
 
 
@@ -1513,9 +1512,8 @@ void add_type_info_type(CheckerContext *c, Type *t) {
 void check_procedure_later(Checker *c, ProcInfo *info) {
 	GB_ASSERT(info != nullptr);
 	GB_ASSERT(info->decl != nullptr);
-	gb_mutex_lock(&c->procs_to_check_mutex);
-	array_add(&c->procs_to_check, info);
-	gb_mutex_unlock(&c->procs_to_check_mutex);
+
+	mpmc_enqueue(&c->procs_to_check_queue, info);
 }
 
 void check_procedure_later(Checker *c, AstFile *file, Token token, DeclInfo *decl, Type *type, Ast *body, u64 tags) {
@@ -4388,27 +4386,18 @@ void check_test_names(Checker *c) {
 }
 
 void check_procedure_bodies(Checker *c) {
-	// TODO(bill): Make this an actual FIFO queue rather than this monstrosity
-	while (c->procs_to_check.count != 0) {
-		ProcInfo *pi = c->procs_to_check.data[0];
-
-		// Preparing to multithread the procedure checking code
-		#if 0
-		gb_mutex_lock(&c->procs_to_check_mutex);
-		defer (gb_mutex_unlock(&c->procs_to_check_mutex));
-
-		array_ordered_remove(&c->procs_to_check, 0);
+	auto *q = &c->procs_to_check_queue;
+	ProcInfo *pi = nullptr;
+	while (mpmc_dequeue(q, &pi)) {
 		if (pi->decl->parent && pi->decl->parent->entity) {
 			Entity *parent = pi->decl->parent->entity;
+			// NOTE(bill): Only check a nested procedure if its parent's body has been checked first
+			// This is prevent any possible race conditions in evaluation when multithreaded
 			if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) {
-				array_add(&c->procs_to_check, pi);
+				mpmc_enqueue(q, pi);
 				continue;
 			}
 		}
-		#else
-		array_ordered_remove(&c->procs_to_check, 0);
-		#endif
-
 		check_proc_info(c, pi);
 	}
 }

+ 2 - 3
src/checker.hpp

@@ -351,11 +351,10 @@ struct Checker {
 
 	CheckerContext builtin_ctx;
 
-
-	gbMutex procs_to_check_mutex;
 	gbMutex procs_with_deferred_to_check_mutex;
-	Array<ProcInfo *> procs_to_check;
 	Array<Entity *>   procs_with_deferred_to_check;
+
+	MPMCQueue<ProcInfo *> procs_to_check_queue;
 };
 
 

+ 1 - 0
src/common.cpp

@@ -167,6 +167,7 @@ GB_ALLOCATOR_PROC(heap_allocator_proc) {
 #include "unicode.cpp"
 #include "array.cpp"
 #include "string.cpp"
+#include "queue.cpp"
 
 #define for_array(index_, array_) for (isize index_ = 0; index_ < (array_).count; index_++)
 

+ 109 - 0
src/queue.cpp

@@ -0,0 +1,109 @@
+#include <atomic> // Because I wanted the C++11 memory order semantics, of which gb.h does not offer (because it was a C89 library)
+
+template <typename T>
+struct MPMCQueueNode {
+	T data;
+	std::atomic<isize> idx;
+};
+
+typedef char CacheLinePad[64];
+
+// Multiple Producer Multiple Consumer Queue
+template <typename T>
+struct MPMCQueue {
+	CacheLinePad pad0;
+	isize mask;
+	Array<MPMCQueueNode<T>> buffer;
+	gbMutex mutex;
+
+	CacheLinePad pad1;
+	std::atomic<isize> head_idx;
+
+	CacheLinePad pad2;
+	std::atomic<isize> tail_idx;
+
+	CacheLinePad pad3;
+};
+
+
+template <typename T>
+void mpmc_init(MPMCQueue<T> *q, gbAllocator a, isize size) {
+	size = next_pow2(size);
+	GB_ASSERT(gb_is_power_of_two(size));
+
+	gb_mutex_init(&q->mutex);
+	q->mask = size-1;
+	array_init(&q->buffer, a, size);
+	for (isize i = 0; i < size; i++) {
+		q->buffer[i].idx.store(i, std::memory_order_relaxed);
+	}
+}
+
+template <typename T>
+void mpmc_destroy(MPMCQueue<T> *q) {
+	gb_mutex_destroy(&q->mutex);
+	gb_array_free(&q->buffer);
+}
+
+
+template <typename T>
+bool mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
+	isize head_idx = q->head_idx.load(std::memory_order_relaxed);
+
+	for (;;) {
+		auto node = &q->buffer.data[head_idx & q->mask];
+		isize node_idx = node->idx.load(std::memory_order_acquire);
+		isize diff = node_idx - head_idx;
+
+		if (diff == 0) {
+			isize next_head_idx = head_idx+1;
+			if (q->head_idx.compare_exchange_weak(head_idx, next_head_idx)) {
+				node->data = data;
+				node->idx.store(next_head_idx, std::memory_order_release);
+				return true;
+			}
+		} else if (diff < 0) {
+			gb_mutex_lock(&q->mutex);
+			isize old_size = q->buffer.count;
+			isize new_size = old_size*2;
+			array_resize(&q->buffer, new_size);
+			if (q->buffer.data == nullptr) {
+				GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size);
+				gb_mutex_unlock(&q->mutex);
+				return false;
+			}
+			for (isize i = old_size; i < new_size; i++) {
+				q->buffer.data[i].idx.store(i, std::memory_order_relaxed);
+			}
+			q->mask = new_size-1;
+			gb_mutex_unlock(&q->mutex);
+		} else {
+			head_idx = q->head_idx.load(std::memory_order_relaxed);
+		}
+	}
+}
+
+
+template <typename T>
+bool mpmc_dequeue(MPMCQueue<T> *q, T *data_) {
+	isize tail_idx = q->tail_idx.load(std::memory_order_relaxed);
+
+	for (;;) {
+		auto node = &q->buffer.data[tail_idx & q->mask];
+		isize node_idx = node->idx.load(std::memory_order_acquire);
+		isize diff = node_idx - (tail_idx+1);
+
+		if (diff == 0) {
+			isize next_tail_idx = tail_idx+1;
+			if (q->tail_idx.compare_exchange_weak(tail_idx, next_tail_idx)) {
+				if (data_) *data_ = node->data;
+				node->idx.store(tail_idx + q->mask + 1, std::memory_order_release);
+				return true;
+			}
+		} else if (diff < 0) {
+			return false;
+		} else {
+			tail_idx = q->tail_idx.load(std::memory_order_relaxed);
+		}
+	}
+}