Browse Source

Improve `sync.Channel` behaviour

gingerBill 5 years ago
parent
commit
96ad6d2084
1 changed files with 28 additions and 18 deletions
  1. 28 18
      core/sync/channel.odin

+ 28 - 18
core/sync/channel.odin

@@ -1,5 +1,6 @@
 package sync
 
+// import "core:fmt"
 import "core:mem"
 import "core:time"
 import "core:intrinsics"
@@ -39,11 +40,11 @@ channel_cap :: proc(ch: $C/Channel($T)) -> int {
 
 channel_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) {
 	msg := msg;
-	_ = raw_channel_send_impl(ch._internal, &msg, false, loc);
+	_ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc);
 }
 channel_try_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) -> bool {
 	msg := msg;
-	return raw_channel_send_impl(ch._internal, &msg, true, loc);
+	return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc);
 }
 
 channel_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T) {
@@ -68,6 +69,10 @@ channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T
 channel_is_nil :: proc(ch: $C/Channel($T)) -> bool {
 	return ch._internal == nil;
 }
+channel_is_open :: proc(ch: $C/Channel($T)) -> bool {
+	c := ch._internal;
+	return c != nil && !c.closed;
+}
 
 
 channel_eq :: proc(a, b: $C/Channel($T)) -> bool {
@@ -108,19 +113,14 @@ channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
 }
 
 
-channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) {
+channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, ok: bool) {
 	c := ch._internal;
-	switch {
-	case c == nil:
+	if c == nil {
 		return;
-	case intrinsics.atomic_load(&c.closed):
-		if channel_can_recv(ch) {
-			val = channel_recv(ch);
-			open = true;
-		}
-	case:
-		val = channel_recv(ch);
-		open = true;
+	}
+
+	if !c.closed || c.len > 0 {
+		val, ok = channel_recv(ch), true;
 	}
 	return;
 }
@@ -129,8 +129,11 @@ channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) {
 channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
 	backing: [64]int;
 	candidates := backing[:];
-	if len(channels) > len(backing) {
-		candidates = make([]int, len(channels), context.temp_allocator);
+	cap := len(channels);
+	if cap > len(backing) {
+		candidates = make([]int, cap, context.temp_allocator);
+	} else {
+		candidates = candidates[:cap];
 	}
 
 	count := u32(0);
@@ -298,7 +301,7 @@ raw_channel_destroy :: proc(c: ^Raw_Channel) {
 }
 
 
-raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc := #caller_location) -> bool {
+raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool {
 	send :: proc(c: ^Raw_Channel, src: rawptr) {
 		dst := uintptr(c.data) + uintptr(c.write * c.elem_size);
 		mem.copy(rawptr(dst), src, c.elem_size);
@@ -315,7 +318,7 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc
 
 	mutex_lock(&c.mutex);
 	if c.cap > 0 {
-		if no_block && c.len >= c.cap {
+		if !block && c.len >= c.cap {
 			mutex_unlock(&c.mutex);
 			return false;
 		}
@@ -323,6 +326,8 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc
 		for c.len >= c.cap {
 			condition_wait_for(&c.cond);
 		}
+	} else if c.len > 0 {
+		condition_wait_for(&c.cond);
 	}
 
 	send(c, msg);
@@ -352,7 +357,12 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat
 	}
 	intrinsics.atomic_store(&c.ready, false);
 	recv(c, res, loc);
-	if c.cap > 0 && c.len == c.cap - 1 {
+	if c.cap > 0 {
+		if c.len == c.cap - 1 {
+			// NOTE(bill): Only signal on the last one
+			condition_signal(&c.cond);
+		}
+	} else {
 		condition_signal(&c.cond);
 	}
 }