Browse Source

Begin work on futex-ifying the threading primitives

gingerBill 2 years ago
parent
commit
20d451396d
1 changed files with 133 additions and 39 deletions
  1. 133 39
      src/threading.cpp

+ 133 - 39
src/threading.cpp

@@ -41,11 +41,12 @@ struct Thread {
 	struct ThreadPool *pool;
 };
 
-typedef std::atomic<int32_t> Futex;
-typedef volatile int32_t     Footex;
+typedef std::atomic<i32> Futex;
+typedef volatile i32     Footex;
 
 gb_internal void futex_wait(Futex *addr, Footex val);
 gb_internal void futex_signal(Futex *addr);
+gb_internal void futex_broadcast(Futex *addr);
 
 gb_internal void mutex_init    (BlockingMutex *m);
 gb_internal void mutex_destroy (BlockingMutex *m);
@@ -117,6 +118,82 @@ struct MutexGuard {
 #define MUTEX_GUARD(m) MutexGuard GB_DEFER_3(_mutex_guard_){m}
 
 
+struct RecursiveMutex {
+	Futex owner;
+	i32   recursion;
+};
+gb_internal void mutex_init(RecursiveMutex *m) {
+
+}
+gb_internal void mutex_destroy(RecursiveMutex *m) {
+
+}
+gb_internal void mutex_lock(RecursiveMutex *m) {
+	Futex tid = cast(i32)thread_current_id();
+	for (;;) {
+		i32 prev_owner = 0;
+		m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire);
+		if (prev_owner == 0 || prev_owner == tid) {
+			m->recursion++;
+			// inside the lock
+			return;
+		}
+		futex_wait(&m->owner, prev_owner);
+	}
+}
+gb_internal bool mutex_try_lock(RecursiveMutex *m) {
+	Futex tid = cast(i32)thread_current_id();
+	i32 prev_owner = 0;
+	m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire);
+	if (prev_owner == 0 || prev_owner == tid) {
+		m->recursion++;
+		// inside the lock
+		return true;
+	}
+	return false;
+}
+gb_internal void mutex_unlock(RecursiveMutex *m) {
+	m->recursion--;
+	if (m->recursion != 0) {
+		return;
+	}
+	m->owner.exchange(0, std::memory_order_release);
+	futex_signal(&m->owner);
+	// outside the lock
+}
+
+struct Semaphore {
+	Futex count;
+};
+
+gb_internal void semaphore_init(Semaphore *s) {
+
+}
+gb_internal void semaphore_destroy(Semaphore *s) {
+
+}
+gb_internal void semaphore_post(Semaphore *s, i32 count) {
+	s->count.fetch_add(count, std::memory_order_release);
+	if (s->count == 1) {
+		futex_signal(&s->count);
+	} else {
+		futex_broadcast(&s->count);
+	}
+}
+gb_internal void semaphore_wait(Semaphore *s) {
+	for (;;) {
+		i32 original_count = s->count.load(std::memory_order_relaxed);
+		while (original_count == 0) {
+			futex_wait(&s->count, original_count);
+			original_count = s->count;
+		}
+
+		if (!s->count.compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) {
+			return;
+		}
+	}
+}
+
 #if defined(GB_SYSTEM_WINDOWS)
 	struct BlockingMutex {
 		SRWLOCK srwlock;
@@ -135,42 +212,6 @@ struct MutexGuard {
 		ReleaseSRWLockExclusive(&m->srwlock);
 	}
 
-	struct RecursiveMutex {
-		CRITICAL_SECTION win32_critical_section;
-	};
-	gb_internal void mutex_init(RecursiveMutex *m) {
-		InitializeCriticalSection(&m->win32_critical_section);
-	}
-	gb_internal void mutex_destroy(RecursiveMutex *m) {
-		DeleteCriticalSection(&m->win32_critical_section);
-	}
-	gb_internal void mutex_lock(RecursiveMutex *m) {
-		EnterCriticalSection(&m->win32_critical_section);
-	}
-	gb_internal bool mutex_try_lock(RecursiveMutex *m) {
-		return TryEnterCriticalSection(&m->win32_critical_section) != 0;
-	}
-	gb_internal void mutex_unlock(RecursiveMutex *m) {
-		LeaveCriticalSection(&m->win32_critical_section);
-	}
-
-	struct Semaphore {
-		void *win32_handle;
-	};
-
-	gb_internal void semaphore_init(Semaphore *s) {
-		s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL);
-	}
-	gb_internal void semaphore_destroy(Semaphore *s) {
-		CloseHandle(s->win32_handle);
-	}
-	gb_internal void semaphore_post(Semaphore *s, i32 count) {
-		ReleaseSemaphore(s->win32_handle, count, NULL);
-	}
-	gb_internal void semaphore_wait(Semaphore *s) {
-		WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE);
-	}
-	
 	struct Condition {
 		CONDITION_VARIABLE cond;
 	};
@@ -458,6 +499,18 @@ gb_internal void futex_signal(Futex *addr) {
 	}
 }
 
+gb_internal void futex_broadcast(Futex *addr) {
+	for (;;) {
+		int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL, 0);
+		if (ret == -1) {
+			perror("Futex wake");
+			GB_PANIC("Failed in futex wake!\n");
+		} else if (ret > 0) {
+			return;
+		}
+	}
+}
+
 gb_internal void futex_wait(Futex *addr, Footex val) {
 	for (;;) {
 		int ret = syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL, 0);
@@ -485,6 +538,10 @@ gb_internal void futex_signal(Futex *addr) {
 	_umtx_op(addr, UMTX_OP_WAKE, 1, 0, 0);
 }
 
+gb_internal void futex_broadcast(Futex *addr) {
+	_umtx_op(addr, UMTX_OP_WAKE, INT32_MAX, 0, 0);
+}
+
 gb_internal void futex_wait(Futex *addr, Footex val) {
 	for (;;) {
 		int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL);
@@ -523,6 +580,23 @@ gb_internal void futex_signal(Futex *addr) {
 	}
 }
 
+
+gb_internal void futex_broadcast(Futex *addr) {
+	for (;;) {
+		int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL);
+		if (ret == -1) {
+			if (errno == ETIMEDOUT || errno == EINTR) {
+				continue;
+			}
+
+			perror("Futex wake");
+			GB_PANIC("futex wake fail");
+		} else if (ret == 1) {
+			return;
+		}
+	}
+}
+
 gb_internal void futex_wait(Futex *addr, Footex val) {
 	for (;;) {
 		int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL);
@@ -565,9 +639,25 @@ gb_internal void futex_signal(Futex *addr) {
 	}
 }
 
+gb_internal void futex_broadcast(Futex *addr) {
+	for (;;) {
+		int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0);
+		if (ret >= 0) {
+			return;
+		}
+		if (ret == EINTR || ret == EFAULT) {
+			continue;
+		}
+		if (ret == ENOENT) {
+			return;
+		}
+		GB_PANIC("Failed in futex wake!\n");
+	}
+}
+
 gb_internal void futex_wait(Futex *addr, Footex val) {
 	for (;;) {
-		int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0);
+		int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, addr, val, 0);
 		if (ret >= 0) {
 			if (*addr != val) {
 				return;
@@ -590,6 +680,10 @@ gb_internal void futex_signal(Futex *addr) {
 	WakeByAddressSingle((void *)addr);
 }
 
+gb_internal void futex_broadcast(Futex *addr) {
+	WakeByAddressAll((void *)addr);
+}
+
 gb_internal void futex_wait(Futex *addr, Footex val) {
 	for (;;) {
 		WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE);