Browse Source

Fix sync.Channel code; add `thread.run_with_poly_data` and `run_with_poly_data(2|3|4)` procedures

gingerBill 4 years ago
parent
commit
95b94a0f56
4 changed files with 113 additions and 24 deletions
  1. 11 13
      core/sync/channel.odin
  2. 93 7
      core/thread/thread.odin
  3. 3 2
      core/thread/thread_unix.odin
  4. 6 2
      core/thread/thread_windows.odin

+ 11 - 13
core/sync/channel.odin

@@ -145,7 +145,7 @@ channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) {
 }
 
 
-channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D >= .Both {
+channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both {
 	c := ch._internal;
 	if c == nil {
 		return;
@@ -287,18 +287,19 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc :=
 		for c.len >= c.cap {
 			condition_wait_for(&c.cond);
 		}
-	} else if c.len > 0 {
+	} else if c.len > 0 { // TODO(bill): determine correct behaviour
 		if !block {
 			return false;
 		}
 		condition_wait_for(&c.cond);
+	} else if c.len == 0 && !block {
+		return false;
 	}
 
 	send(c, msg);
 	condition_signal(&c.cond);
 	raw_channel_wait_queue_signal(c.recvq);
 
-
 	return true;
 }
 
@@ -564,7 +565,7 @@ select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int)
 			q.state = &state;
 			raw_channel_wait_queue_insert(&c.recvq, q);
 		}
-		raw_channel_wait_queue_wait_on(&state);
+		raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
 		for c, i in channels {
 			q := &queues[i];
 			raw_channel_wait_queue_remove(&c.recvq, q);
@@ -618,7 +619,7 @@ select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int)
 			q.state = &state;
 			raw_channel_wait_queue_insert(&c.recvq, q);
 		}
-		raw_channel_wait_queue_wait_on(&state);
+		raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
 		for c, i in channels {
 			q := &queues[i];
 			raw_channel_wait_queue_remove(&c.recvq, q);
@@ -813,13 +814,12 @@ select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_che
 select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
 	switch len(channels) {
 	case 0:
-		index = 0;
+		index = -1;
 		return;
 	case 1:
-		if c := channels[0]; channel_can_recv(c) {
+		ok: bool;
+		if msg, ok = channel_try_recv(channels[0]); ok {
 			index = 0;
-			msg = channel_recv(c);
-			return;
 		}
 		return;
 	}
@@ -850,15 +850,13 @@ select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: i
 }
 
 select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
+	index = -1;
 	switch len(channels) {
 	case 0:
-		index = 0;
 		return;
 	case 1:
-		if c := channels[0]; channel_can_send(c) {
+		if channel_try_send(channels[0], msg) {
 			index = 0;
-			channel_send(c, msg);
-			return;
 		}
 		return;
 	}

+ 93 - 7
core/thread/thread.odin

@@ -2,17 +2,26 @@ package thread
 
 import "core:runtime"
 import "core:sync"
-import "core:intrinsics"
+import "core:mem"
+import "intrinsics"
+
+_ :: intrinsics;
 
 Thread_Proc :: #type proc(^Thread);
 
+MAX_USER_ARGUMENTS :: 8;
+
 Thread :: struct {
-	using specific:   Thread_Os_Specific,
-	procedure:        Thread_Proc,
-	data:             rawptr,
-	user_index:       int,
+	using specific: Thread_Os_Specific,
+	procedure:      Thread_Proc,
+	data:           rawptr,
+	user_index:     int,
+	user_args:      [MAX_USER_ARGUMENTS]rawptr,
 
 	init_context: Maybe(runtime.Context),
+
+
+	creation_allocator: mem.Allocator,
 }
 
 #assert(size_of(Thread{}.user_index) == size_of(uintptr));
@@ -34,17 +43,94 @@ run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority :=
 run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) {
 	thread_proc :: proc(t: ^Thread) {
 		fn := cast(proc(rawptr))t.data;
-		data := rawptr(uintptr(t.user_index));
+		assert(t.user_index >= 1);
+		data := t.user_args[0];
 		fn(data);
 		destroy(t);
 	}
 	t := create(thread_proc, priority);
 	t.data = rawptr(fn);
-	t.user_index = int(uintptr(data));
+	t.user_index = 1;
+	t.user_args = data;
+	t.init_context = init_context;
+	start(t);
+}
+
+run_with_poly_data :: proc(data: $T, fn: proc(data: T), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
+	where intrinsics.type_is_pointer(T) || size_of(T) == size_of(rawptr) {
+	thread_proc :: proc(t: ^Thread) {
+		fn := cast(proc(rawptr))t.data;
+		assert(t.user_index >= 1);
+		data := t.user_args[0];
+		fn(data);
+		destroy(t);
+	}
+	t := create(thread_proc, priority);
+	t.data = rawptr(fn);
+	t.user_index = 1;
+	t.user_args[0] = transmute(rawptr)data;
+	t.init_context = init_context;
+	start(t);
+}
+
+run_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
+	where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr),
+	      intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr) {
+	thread_proc :: proc(t: ^Thread) {
+		fn := cast(proc(rawptr, rawptr))t.data;
+		assert(t.user_index >= 2);
+		fn(t.user_args[0], t.user_args[1]);
+		destroy(t);
+	}
+	t := create(thread_proc, priority);
+	t.data = rawptr(fn);
+	t.user_index = 2;
+	t.user_args[0] = transmute(rawptr)arg1;
+	t.user_args[1] = transmute(rawptr)arg2;
 	t.init_context = init_context;
 	start(t);
 }
 
+run_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: proc(arg1: T1, arg2: T2, arg3: T3), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
+	where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr),
+	      intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr),
+	      intrinsics.type_is_pointer(T3) || size_of(T3) == size_of(rawptr) {
+	thread_proc :: proc(t: ^Thread) {
+		fn := cast(proc(rawptr, rawptr, rawptr))t.data;
+		assert(t.user_index >= 3);
+		fn(t.user_args[0], t.user_args[1], t.user_args[2]);
+		destroy(t);
+	}
+	t := create(thread_proc, priority);
+	t.data = rawptr(fn);
+	t.user_index = 3;
+	t.user_args[0] = transmute(rawptr)arg1;
+	t.user_args[1] = transmute(rawptr)arg2;
+	t.user_args[2] = transmute(rawptr)arg3;
+	t.init_context = init_context;
+	start(t);
+}
+run_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4: $T4, fn: proc(arg1: T1, arg2: T2, arg3: T3, arg4: T4), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
+	where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr),
+	      intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr),
+	      intrinsics.type_is_pointer(T3) || size_of(T3) == size_of(rawptr) {
+	thread_proc :: proc(t: ^Thread) {
+		fn := cast(proc(rawptr, rawptr, rawptr))t.data;
+		assert(t.user_index >= 3);
+		fn(t.user_args[0], t.user_args[1], t.user_args[2]);
+		destroy(t);
+	}
+	t := create(thread_proc, priority);
+	t.data = rawptr(fn);
+	t.user_index = 3;
+	t.user_args[0] = transmute(rawptr)arg1;
+	t.user_args[1] = transmute(rawptr)arg2;
+	t.user_args[2] = transmute(rawptr)arg3;
+	t.init_context = init_context;
+	start(t);
+}
+
+
 
 create_and_start :: proc(fn: Thread_Proc, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) -> ^Thread {
 	t := create(fn, priority);

+ 3 - 2
core/thread/thread_unix.odin

@@ -85,6 +85,7 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T
 	if thread == nil {
 		return nil;
 	}
+	thread.creation_allocator = context.allocator;
 
 	// Set thread priority.
 	policy: i32;
@@ -106,7 +107,7 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T
 	sync.mutex_init(&thread.start_mutex);
 	sync.condition_init(&thread.start_gate, &thread.start_mutex);
 	if unix.pthread_create(&thread.unix_thread, &attrs, __linux_thread_entry_proc, thread) != 0 {
-		free(thread);
+		free(thread, thread.creation_allocator);
 		return nil;
 	}
 	thread.procedure = procedure;
@@ -172,7 +173,7 @@ join_multiple :: proc(threads: ..^Thread) {
 destroy :: proc(t: ^Thread) {
 	join(t);
 	t.unix_thread = {};
-	free(t);
+	free(t, t.creation_allocator);
 }
 
 

+ 6 - 2
core/thread/thread_windows.odin

@@ -49,10 +49,14 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T
 
 
 	thread := new(Thread);
+	if thread == nil {
+		return nil;
+	}
+	thread.creation_allocator = context.allocator;
 
 	win32_thread := win32.CreateThread(nil, 0, __windows_thread_entry_proc, thread, win32.CREATE_SUSPENDED, &win32_thread_id);
 	if win32_thread == nil {
-		free(thread);
+		free(thread, thread.creation_allocator);
 		return nil;
 	}
 	thread.procedure       = procedure;
@@ -111,7 +115,7 @@ join_multiple :: proc(threads: ..^Thread) {
 
 destroy :: proc(thread: ^Thread) {
 	join(thread);
-	free(thread);
+	free(thread, thread.creation_allocator);
 }
 
 terminate :: proc(using thread : ^Thread, exit_code: u32) {