Browse Source

Update threading.cpp

avanspector 1 year ago
parent
commit
d4d9f55556
1 changed files with 102 additions and 43 deletions
  1. 102 43
      src/threading.cpp

+ 102 - 43
src/threading.cpp

@@ -840,86 +840,131 @@ gb_internal void futex_wait(Futex *f, Footex val) {
 
 
 #include <pthread.h>
 #include <pthread.h>
 #include <atomic>
 #include <atomic>
+
+struct _Spinlock {
+	std::atomic_flag state;
+
+	void init() {
+		state.clear();
+	}
+
+	void lock() {
+		while (state.test_and_set(std::memory_order_acquire)) {
+			#if defined(GB_CPU_X86)
+			_mm_pause();
+			#else
+			(void)0; // spin...
+			#endif
+		}
+	}
+
+	void unlock() {
+		state.clear(std::memory_order_release);
+	}
+};
+
+struct Futex_Waitq;
  
  
-struct Futex_Wait_Node {
+struct Futex_Waiter {
+	_Spinlock lock;
 	pthread_t thread;
 	pthread_t thread;
 	Futex *futex;
 	Futex *futex;
-	Futex_Wait_Node *prev, *next;	
+	Futex_Waitq *waitq;
+	Futex_Waiter *prev, *next;	
 };
 };
  
  
-struct Futex_Wait_Queue {
-	std::atomic_flag spinlock;
-	Futex_Wait_Node list;
+struct Futex_Waitq {
+	_Spinlock lock;
+	Futex_Waiter list;
  
  
-	void lock() {
-		while (spinlock.test_and_set(std::memory_order_acquire)) {
-			; // spin...
-		}	
-	}
- 
-	void unlock() {
-		spinlock.clear(std::memory_order_release);
+	void init() {
+		auto head = &list;
+		head->prev = head->next = head;
 	}
 	}
 };
 };
 
 
 // FIXME: This approach may scale badly in the future,
 // FIXME: This approach may scale badly in the future,
 // possible solution - hash map (leads to deadlocks now).
 // possible solution - hash map (leads to deadlocks now).
  
  
-Futex_Wait_Queue g_waitq = {
-	.spinlock = ATOMIC_FLAG_INIT,
+Futex_Waitq g_waitq = {
+	.lock = ATOMIC_FLAG_INIT,
 	.list = {
 	.list = {
 		.prev = &g_waitq.list,
 		.prev = &g_waitq.list,
 		.next = &g_waitq.list,
 		.next = &g_waitq.list,
 	},
 	},
 };
 };
  
  
-Futex_Wait_Queue *get_wait_queue(Futex *f) {
+Futex_Waitq *get_waitq(Futex *f) {
 	// Future hash map method...
 	// Future hash map method...
 	return &g_waitq;
 	return &g_waitq;
 }
 }
  
  
 void futex_signal(Futex *f) {
 void futex_signal(Futex *f) {
-	auto waitq = get_wait_queue(f);
+	auto waitq = get_waitq(f);
  
  
-	waitq->lock();
+	waitq->lock.lock();
  
  
 	auto head = &waitq->list;
 	auto head = &waitq->list;
 	for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
 	for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
-		if (waiter->futex == f) {
-			pthread_kill(waiter->thread, SIGCONT);
-			break;
-		}	
+		if (waiter->futex != f) {
+			continue;
+		}
+		waitq->lock.unlock();
+		pthread_kill(waiter->thread, SIGCONT);
+		return;
 	}
 	}
  
  
-	waitq->unlock();
+	waitq->lock.unlock();
 }
 }
  
  
 void futex_broadcast(Futex *f) {
 void futex_broadcast(Futex *f) {
-	auto waitq = get_wait_queue(f);
+	auto waitq = get_waitq(f);
  
  
-	waitq->lock();
+	waitq->lock.lock();
  
  
 	auto head = &waitq->list;
 	auto head = &waitq->list;
 	for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
 	for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
-		if (waiter->futex == f) {
+		if (waiter->futex != f) {
+			continue;
+		}
+		if (waiter->next == head) {
+			waitq->lock.unlock();
 			pthread_kill(waiter->thread, SIGCONT);
 			pthread_kill(waiter->thread, SIGCONT);
-		}	
+			return;
+		} else {
+			pthread_kill(waiter->thread, SIGCONT);
+		}
 	}
 	}
  
  
-	waitq->unlock();
+	waitq->lock.unlock();
 }
 }
  
  
 void futex_wait(Futex *f, Footex val) {
 void futex_wait(Futex *f, Footex val) {
-	auto waitq = get_wait_queue(f);
- 
-	waitq->lock();
- 
-	auto head = &waitq->list;
-	Futex_Wait_Node waiter;
+	Futex_Waiter waiter;
 	waiter.thread = pthread_self();
 	waiter.thread = pthread_self();
 	waiter.futex = f;
 	waiter.futex = f;
-	waiter.prev = head;
-	waiter.next = head->next;
+
+	auto waitq = get_waitq(f);
+	while (waitq->lock.state.test_and_set(std::memory_order_acquire)) {
+		if (f->load(std::memory_order_relaxed) != val) {
+			return;
+		}
+		#if defined(GB_CPU_X86)
+		_mm_pause();
+		#else
+		(void)0; // spin...
+		#endif
+	}
+
+	waiter.waitq = waitq;
+	waiter.lock.init();
+	waiter.lock.lock();
+ 
+	auto head = &waitq->list;
+	waiter.prev = head->prev;
+	waiter.next = head;
+	waiter.prev->next = &waiter;
+	waiter.next->prev = &waiter;
  
  
 	waiter.prev->next = &waiter;
 	waiter.prev->next = &waiter;
 	waiter.next->prev = &waiter;
 	waiter.next->prev = &waiter;
@@ -928,12 +973,25 @@ void futex_wait(Futex *f, Footex val) {
 	sigemptyset(&mask);
 	sigemptyset(&mask);
 	sigaddset(&mask, SIGCONT);
 	sigaddset(&mask, SIGCONT);
 	pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
 	pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
- 
-	if (*f == val) {
-		waitq->unlock();
-		int sig;
-		sigwait(&mask, &sig);
-		waitq->lock();
+
+	if (f->load(std::memory_order_relaxed) == val) {
+			waiter.lock.unlock();
+			waitq->lock.unlock();
+
+			int sig;
+			sigwait(&mask, &sig);
+
+			waitq->lock.lock();
+			waiter.lock.lock();
+
+			while (waitq != waiter.waitq) {
+				auto req = waiter.waitq;
+				waiter.lock.unlock();
+				waitq->lock.unlock();
+				waitq = req;
+				waitq->lock.lock();
+				waiter.lock.lock();
+			}
 	}
 	}
  
  
 	waiter.prev->next = waiter.next;
 	waiter.prev->next = waiter.next;
@@ -941,7 +999,8 @@ void futex_wait(Futex *f, Footex val) {
  
  
 	pthread_sigmask(SIG_SETMASK, &old_mask, NULL);
 	pthread_sigmask(SIG_SETMASK, &old_mask, NULL);
  
  
-	waitq->unlock();
+	waiter.lock.unlock();
+	waitq->lock.unlock();
 }
 }
 
 
 #endif
 #endif