Browse Source

Modify MPMCQueue behaviour to use `i32` over `isize`; Correct cache line padding within MPMCQueue

gingerBill 4 years ago
parent
commit
af32aba7fc
2 changed files with 46 additions and 35 deletions
  1. 13 1
      src/common.cpp
  2. 33 34
      src/queue.cpp

+ 13 - 1
src/common.cpp

@@ -763,7 +763,19 @@ isize next_pow2_isize(isize n) {
 	n++;
 	return n;
 }
-
+u32 next_pow2_u32(u32 n) {
+	if (n == 0) {
+		return 0;
+	}
+	n--;
+	n |= n >> 1;
+	n |= n >> 2;
+	n |= n >> 4;
+	n |= n >> 8;
+	n |= n >> 16;
+	n++;
+	return n;
+}
 
 
 i32 bit_set_count(u32 x) {

+ 33 - 34
src/queue.cpp

@@ -1,40 +1,41 @@
 template <typename T>
 struct MPMCQueueNode {
-	std::atomic<isize> idx;
-	T                  data;
+	std::atomic<i32> idx;
+	T                data;
 };
 
 template <typename T>
 struct MPMCQueueNodeNonAtomic {
-	isize idx;
-	T     data;
+	i32 idx;
+	T   data;
 };
 
-typedef char CacheLinePad[64];
-
 // Multiple Producer Multiple Consumer Queue
 template <typename T>
 struct MPMCQueue {
-	CacheLinePad pad0;
-	isize mask;
+	static size_t const PAD0_OFFSET = (sizeof(i32) + sizeof(Array<MPMCQueueNode<T>>) + sizeof(BlockingMutex) + sizeof(i32));
+
+	i32 mask;
 	Array<MPMCQueueNode<T>> buffer;
 	BlockingMutex mutex;
-	std::atomic<isize> count;
-
-	CacheLinePad pad1;
-	std::atomic<isize> head_idx;
+	std::atomic<i32> count;
 
-	CacheLinePad pad2;
-	std::atomic<isize> tail_idx;
+	char pad0[(128 - PAD0_OFFSET) % 64];
+	std::atomic<i32> head_idx;
 
-	CacheLinePad pad3;
+	char pad1[64 - sizeof(i32)];
+	std::atomic<i32> tail_idx;
 };
 
 
 template <typename T>
-void mpmc_init(MPMCQueue<T> *q, gbAllocator a, isize size) {
-	size = gb_max(size, 8);
-	size = next_pow2_isize(size);
+void mpmc_init(MPMCQueue<T> *q, gbAllocator a, isize size_i) {
+	if (size_i < 8) {
+		size_i = 8;
+	}
+	GB_ASSERT(size_i < I32_MAX);
+	i32 size = cast(i32)size_i;
+	size = next_pow2(size);
 	GB_ASSERT(gb_is_power_of_two(size));
 
 	mutex_init(&q->mutex);
@@ -43,7 +44,7 @@ void mpmc_init(MPMCQueue<T> *q, gbAllocator a, isize size) {
 
 	// NOTE(bill): pretend it's not atomic for performance
 	auto *raw_data = cast(MPMCQueueNodeNonAtomic<T> *)q->buffer.data;
-	for (isize i = 0; i < size; i += 8) {
+	for (i32 i = 0; i < size; i += 8) {
 		raw_data[i+0].idx = i+0;
 		raw_data[i+1].idx = i+1;
 		raw_data[i+2].idx = i+2;
@@ -63,20 +64,18 @@ void mpmc_destroy(MPMCQueue<T> *q) {
 
 
 template <typename T>
-isize mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
-	if (q->mask == 0) {
-		return -1;
-	}
+i32 mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
+	GB_ASSERT(q->mask != 0);
 
-	isize head_idx = q->head_idx.load(std::memory_order_relaxed);
+	i32 head_idx = q->head_idx.load(std::memory_order_relaxed);
 
 	for (;;) {
 		auto node = &q->buffer.data[head_idx & q->mask];
-		isize node_idx = node->idx.load(std::memory_order_acquire);
-		isize diff = node_idx - head_idx;
+		i32 node_idx = node->idx.load(std::memory_order_acquire);
+		i32 diff = node_idx - head_idx;
 
 		if (diff == 0) {
-			isize next_head_idx = head_idx+1;
+			i32 next_head_idx = head_idx+1;
 			if (q->head_idx.compare_exchange_weak(head_idx, next_head_idx)) {
 				node->data = data;
 				node->idx.store(next_head_idx, std::memory_order_release);
@@ -84,8 +83,8 @@ isize mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
 			}
 		} else if (diff < 0) {
 			mutex_lock(&q->mutex);
-			isize old_size = q->buffer.count;
-			isize new_size = old_size*2;
+			i32 old_size = cast(i32)q->buffer.count;
+			i32 new_size = old_size*2;
 			array_resize(&q->buffer, new_size);
 			if (q->buffer.data == nullptr) {
 				GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size);
@@ -94,7 +93,7 @@ isize mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
 			}
 			// NOTE(bill): pretend it's not atomic for performance
 			auto *raw_data = cast(MPMCQueueNodeNonAtomic<T> *)q->buffer.data;
-			for (isize i = old_size; i < new_size; i++) {
+			for (i32 i = old_size; i < new_size; i++) {
 				raw_data[i].idx = i;
 			}
 			q->mask = new_size-1;
@@ -111,15 +110,15 @@ bool mpmc_dequeue(MPMCQueue<T> *q, T *data_) {
 		return false;
 	}
 
-	isize tail_idx = q->tail_idx.load(std::memory_order_relaxed);
+	i32 tail_idx = q->tail_idx.load(std::memory_order_relaxed);
 
 	for (;;) {
 		auto node = &q->buffer.data[tail_idx & q->mask];
-		isize node_idx = node->idx.load(std::memory_order_acquire);
-		isize diff = node_idx - (tail_idx+1);
+		i32 node_idx = node->idx.load(std::memory_order_acquire);
+		i32 diff = node_idx - (tail_idx+1);
 
 		if (diff == 0) {
-			isize next_tail_idx = tail_idx+1;
+			i32 next_tail_idx = tail_idx+1;
 			if (q->tail_idx.compare_exchange_weak(tail_idx, next_tail_idx)) {
 				if (data_) *data_ = node->data;
 				node->idx.store(tail_idx + q->mask + 1, std::memory_order_release);