Browse Source

Implement `MPSCQueue`

gingerBill 2 years ago
parent
commit
c15db05199
3 changed files with 87 additions and 6 deletions
  1. 4 4
      src/checker.cpp
  2. 1 2
      src/checker.hpp
  3. 82 0
      src/queue.cpp

+ 4 - 4
src/checker.cpp

@@ -1161,7 +1161,7 @@ gb_internal void init_checker_info(CheckerInfo *i) {
 	TIME_SECTION("checker info: mpmc queues");
 
 	mpmc_init(&i->entity_queue, a, 1<<20);
-	mpmc_init(&i->definition_queue, a, 1<<20);
+	mpsc_init(&i->definition_queue, a); //, 1<<20);
 	mpmc_init(&i->required_global_variable_queue, a, 1<<10);
 	mpmc_init(&i->required_foreign_imports_through_force_queue, a, 1<<10);
 	mpmc_init(&i->intrinsics_entry_point_usage, a, 1<<10); // just waste some memory here, even if it probably never used
@@ -1182,7 +1182,7 @@ gb_internal void destroy_checker_info(CheckerInfo *i) {
 	array_free(&i->required_foreign_imports_through_force);
 
 	mpmc_destroy(&i->entity_queue);
-	mpmc_destroy(&i->definition_queue);
+	mpsc_destroy(&i->definition_queue);
 	mpmc_destroy(&i->required_global_variable_queue);
 	mpmc_destroy(&i->required_foreign_imports_through_force_queue);
 
@@ -1493,7 +1493,7 @@ gb_internal void add_entity_definition(CheckerInfo *i, Ast *identifier, Entity *
 	GB_ASSERT(entity != nullptr);
 	identifier->Ident.entity = entity;
 	entity->identifier = identifier;
-	mpmc_enqueue(&i->definition_queue, entity);
+	mpsc_enqueue(&i->definition_queue, entity);
 }
 
 gb_internal bool redeclaration_error(String name, Entity *prev, Entity *found) {
@@ -5583,7 +5583,7 @@ gb_internal void check_add_entities_from_queues(Checker *c) {
 gb_internal void check_add_definitions_from_queues(Checker *c) {
 	isize cap = c->info.definitions.count + c->info.definition_queue.count.load(std::memory_order_relaxed);
 	array_reserve(&c->info.definitions, cap);
-	for (Entity *e; mpmc_dequeue(&c->info.definition_queue, &e); /**/) {
+	for (Entity *e; mpsc_dequeue(&c->info.definition_queue, &e); /**/) {
 		array_add(&c->info.definitions, e);
 	}
 }

+ 1 - 2
src/checker.hpp

@@ -303,7 +303,6 @@ struct UntypedExprInfo {
 };
 
 typedef PtrMap<Ast *, ExprInfo *> UntypedExprInfoMap; 
-typedef MPMCQueue<ProcInfo *> ProcBodyQueue;
 
 enum ObjcMsgKind : u32 {
 	ObjcMsg_normal,
@@ -380,7 +379,7 @@ struct CheckerInfo {
 
 	// NOTE(bill): These are actually MPSC queues
 	// TODO(bill): Convert them to be MPSC queues
-	MPMCQueue<Entity *> definition_queue;
+	MPSCQueue<Entity *> definition_queue;
 	MPMCQueue<Entity *> entity_queue;
 	MPMCQueue<Entity *> required_global_variable_queue;
 	MPMCQueue<Entity *> required_foreign_imports_through_force_queue;

+ 82 - 0
src/queue.cpp

@@ -1,3 +1,85 @@
+template <typename T>
+struct MPSCNode {
+	std::atomic<MPSCNode<T> *> next;
+	T value;
+};
+
+//
+// Multiple Producer Single Consumer Lockless Queue
+// URL: https://www.1024cores.net
+//
+template <typename T>
+struct MPSCQueue {
+	std::atomic<MPSCNode<T> *> head;
+	std::atomic<MPSCNode<T> *> tail;
+	std::atomic<isize> count;
+	MPSCNode<T> sentinel;
+	gbAllocator allocator;
+};
+
+template <typename T> gb_internal void         mpsc_init   (MPSCQueue<T> *q, gbAllocator const &allocator);
+template <typename T> gb_internal void         mpsc_destroy(MPSCQueue<T> *q);
+template <typename T> gb_internal isize        mpsc_enqueue(MPSCQueue<T> *q, T const &value);
+template <typename T> gb_internal bool         mpsc_dequeue(MPSCQueue<T> *q, T *value_);
+template <typename T> gb_internal MPSCNode<T> *mpsc_tail   (MPSCQueue<T> *q);
+
+template <typename T>
+gb_internal void mpsc_init(MPSCQueue<T> *q, gbAllocator const &allocator) {
+	q->allocator = allocator;
+	q->count.store(0, std::memory_order_relaxed);
+	q->head.store(&q->sentinel, std::memory_order_relaxed);
+	q->tail.store(&q->sentinel, std::memory_order_relaxed);
+	q->sentinel.next.store(nullptr, std::memory_order_relaxed);
+}
+
+template <typename T>
+gb_internal void mpsc_destroy(MPSCQueue<T> *q) {
+	while (mpsc_dequeue(q, (T *)nullptr)) {}
+	// DO NOTHING for the time being
+}
+
+
+template <typename T>
+gb_internal MPSCNode<T> *mpsc_alloc_node(MPSCQueue<T> *q, T const &value) {
+	auto node = gb_alloc_item(q->allocator, MPSCNode<T>);
+	node->value = value;
+	return node;
+}
+
+template <typename T>
+gb_internal isize mpsc_enqueue(MPSCQueue<T> *q, MPSCNode<T> *node) {
+	node->next.store(nullptr, std::memory_order_relaxed);
+	auto prev = q->head.exchange(node, std::memory_order_acq_rel);
+	prev->next.store(node, std::memory_order_release);
+	isize count = 1 + q->count.fetch_add(1, std::memory_order_acq_rel);
+	return count;
+}
+
+template <typename T>
+gb_internal isize mpsc_enqueue(MPSCQueue<T> *q, T const &value) {
+	auto node = mpsc_alloc_node(q, value);
+	return mpsc_enqueue(q, node);
+}
+
+
+template <typename T>
+gb_internal bool mpsc_dequeue(MPSCQueue<T> *q, T *value_) {
+	auto tail = q->tail.load(std::memory_order_relaxed);
+	auto next = tail->next.load(std::memory_order_relaxed);
+	if (next) {
+		q->tail.store(next, std::memory_order_relaxed);
+		// `tail` is now "dead" and needs to be "freed"
+		if (*value_) *value_ = next->value;
+		q->count.fetch_sub(1, std::memory_order_acq_rel);
+		return true;
+	}
+	return false;
+}
+
+////////////////////////////
+
+
+
 #define MPMC_CACHE_LINE_SIZE 64
 
 typedef std::atomic<i32> MPMCQueueAtomicIdx;