소스 검색

fix thread data races

Dale Weiler 3 년 전
부모
커밋
0e6de5673b
1개의 변경된 파일40개의 추가작업 그리고 80개의 파일을 삭제
  1. 40 80
      core/thread/thread_unix.odin

+ 40 - 80
core/thread/thread_unix.odin

@@ -4,33 +4,22 @@ package thread
 
 import "core:runtime"
 import "core:intrinsics"
-import "core:sync"
+import sync "core:sync/sync2"
 import "core:sys/unix"
 
+Thread_State :: enum u8 {
+	Started,
+	Joined,
+	Done,
+}
+
 // NOTE(tetra): Aligned here because of core/unix/pthread_linux.odin/pthread_t.
 // Also see core/sys/darwin/mach_darwin.odin/semaphore_t.
 Thread_Os_Specific :: struct #align 16 {
 	unix_thread: unix.pthread_t, // NOTE: very large on Darwin, small on Linux.
-
-	// NOTE: pthread has a proc to query this, but it is marked
-	// as non-portable ("np") so we do this instead.
-	done: bool,
-
-	// since libpthread doesn't seem to have a way to create a thread
-	// in a suspended state, we have it wait on this gate, which we
-	// signal to start it.
-	// destroyed after thread is started.
-	start_gate:  sync.Condition,
-	start_mutex: sync.Mutex,
-
-	// if true, the thread has been started and the start_gate has been destroyed.
-	started: bool,
-
-	// NOTE: with pthreads, it is undefined behavior for multiple threads
-	// to call join on the same thread at the same time.
-	// this value is atomically updated to detect this.
-	// See the comment in `join`.
-	already_joined: bool,
+	cond:        sync.Cond,
+	mutex:       sync.Mutex,
+	flags:       bit_set[Thread_State; u8],
 }
 //
 // Creates a thread which will run the given procedure.
@@ -38,26 +27,31 @@ Thread_Os_Specific :: struct #align 16 {
 //
 _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread {
 	__linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr {
-		context = runtime.default_context()
 		t := (^Thread)(t)
-		sync.condition_wait_for(&t.start_gate)
-		sync.condition_destroy(&t.start_gate)
-		sync.mutex_destroy(&t.start_mutex)
-		t.start_gate = {}
-		t.start_mutex = {}
-
-		context = t.init_context.? or_else runtime.default_context()
-		
+
+		context = runtime.default_context()
+
+		sync.lock(&t.mutex)
+
 		t.id = sync.current_thread_id()
+
+		if .Started not_in t.flags {
+			sync.wait(&t.cond, &t.mutex)
+		}
+
+		init_context := t.init_context
+		context =	init_context.? or_else runtime.default_context()
+
 		t.procedure(t)
 
-		if t.init_context == nil {
-			if context.temp_allocator.data == &runtime.global_default_temp_allocator_data {
-				runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data)
-			}
+		t.flags += { .Done }
+
+		sync.unlock(&t.mutex)
+
+		if init_context == nil && context.temp_allocator.data == &runtime.global_default_temp_allocator_data {
+			runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data)
 		}
 
-		intrinsics.atomic_store(&t.done, true)
 		return nil
 	}
 
@@ -76,9 +70,6 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^
 		return nil
 	}
 	thread.creation_allocator = context.allocator
-	
-	sync.mutex_init(&thread.start_mutex)
-	sync.condition_init(&thread.start_gate, &thread.start_mutex)
 
 	// Set thread priority.
 	policy: i32
@@ -97,65 +88,36 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^
 	res = unix.pthread_attr_setschedparam(&attrs, &params)
 	assert(res == 0)
 
+	thread.procedure = procedure
 	if unix.pthread_create(&thread.unix_thread, &attrs, __linux_thread_entry_proc, thread) != 0 {
 		free(thread, thread.creation_allocator)
-		
-		sync.condition_destroy(&thread.start_gate)
-		sync.mutex_destroy(&thread.start_mutex)
 		return nil
 	}
-	thread.procedure = procedure
-
 
 	return thread
 }
 
 _start :: proc(t: ^Thread) {
-	if intrinsics.atomic_xchg(&t.started, true) {
-		return
-	}
-	sync.condition_signal(&t.start_gate)
+	sync.lock(&t.mutex)
+	t.flags += { .Started }
+	sync.signal(&t.cond)
+	sync.unlock(&t.mutex)
 }
 
 _is_done :: proc(t: ^Thread) -> bool {
-	return intrinsics.atomic_load(&t.done)
+	return intrinsics.atomic_and(&t.flags, { .Done }) != nil
 }
 
 _join :: proc(t: ^Thread) {
-	if unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
-		return
-	}
-	// if unix.pthread_self().x == t.unix_thread.x do return;
-
-	// NOTE(tetra): It's apparently UB for multiple threads to join the same thread
-	// at the same time.
-	// If someone else already did, spin until the thread dies.
-	// See note on `already_joined` field.
-	// TODO(tetra): I'm not sure if we should do this, or panic, since I'm not
-	// sure it makes sense to need to join from multiple threads?
-	if intrinsics.atomic_xchg(&t.already_joined, true) {
-		for {
-			if intrinsics.atomic_load(&t.done) {
-				return
-			}
-			intrinsics.cpu_relax()
-		}
-	}
+	sync.guard(&t.mutex)
 
-	// NOTE(tetra): If we're already dead, don't bother calling to pthread_join as that
-	// will just return 3 (ESRCH).
-	// We do this instead because I don't know if there is a danger
-	// that you may join a different thread from the one you called join on,
-	// if the thread handle is reused.
-	if intrinsics.atomic_load(&t.done) {
+	if .Joined in t.flags || unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
 		return
 	}
 
-	ret_val: rawptr
-	_ = unix.pthread_join(t.unix_thread, &ret_val)
-	if !intrinsics.atomic_load(&t.done) {
-		panic("thread not done after join")
-	}
+	unix.pthread_join(t.unix_thread, nil)
+
+	t.flags += { .Joined }
 }
 
 _join_multiple :: proc(threads: ..^Thread) {
@@ -164,14 +126,12 @@ _join_multiple :: proc(threads: ..^Thread) {
 	}
 }
 
-
 _destroy :: proc(t: ^Thread) {
 	_join(t)
 	t.unix_thread = {}
 	free(t, t.creation_allocator)
 }
 
-
 _terminate :: proc(t: ^Thread, exit_code: int) {
 	// TODO(bill)
 }