Răsfoiți Sursa

Revert channel.odin

gingerBill 5 ani în urmă
părinte
comite
b633a42bc2
1 a modificat fișierele cu 101 adăugiri și 90 ștergeri
  1. 101 90
      core/sync/channel.odin

+ 101 - 90
core/sync/channel.odin

@@ -2,7 +2,6 @@ package sync
 
 import "core:mem"
 import "core:time"
-import "core:fmt"
 import "core:math/rand"
 
 _, _ :: time, rand;
@@ -18,16 +17,13 @@ _Channel_Internal :: struct(T: typeid) {
 
 	unbuffered_msg: T, // Will be used as the backing to the queue if no `cap` is given
 
-	mutex:   Mutex,
-	r_mutex: Mutex,
-	w_mutex: Mutex,
-	r_cond:  Condition,
-	w_cond:  Condition,
+	mutex:  Mutex,
+	r_cond: Condition,
+	w_cond: Condition,
 
-	is_buffered: bool,
-	is_closed:   bool,
-	r_waiting:   int,
-	w_waiting:   int,
+	closed:    bool,
+	r_waiting: int,
+	w_waiting: int,
 }
 
 channel_init :: proc(c: ^$C/Channel($T), cap: int = 0, allocator := context.allocator) {
@@ -42,20 +38,16 @@ channel_make :: proc($T: typeid, cap: int = 0, allocator := context.allocator) -
 	ch.allocator = allocator;
 
 	mutex_init(&ch.mutex);
-	mutex_init(&ch.r_mutex);
-	mutex_init(&ch.w_mutex);
 	condition_init(&ch.r_cond, &ch.mutex);
 	condition_init(&ch.w_cond, &ch.mutex);
-	ch.is_closed = false;
+	ch.closed = false;
 	ch.r_waiting = 0;
 	ch.w_waiting = 0;
 	ch.unbuffered_msg = T{};
 
 	if cap > 0 {
-		ch.is_buffered = true;
 		ch.queue = make([dynamic]T, 0, cap, ch.allocator);
 	} else {
-		ch.is_buffered = false;
 		d := mem.Raw_Dynamic_Array{
 			data = &ch.unbuffered_msg,
 			len  = 0,
@@ -75,8 +67,6 @@ channel_destroy :: proc(ch: $C/Channel($T)) {
 	}
 
 	mutex_destroy(&ch.mutex);
-	mutex_destroy(&ch.r_mutex);
-	mutex_destroy(&ch.w_mutex);
 	condition_destroy(&ch.r_cond);
 	condition_destroy(&ch.w_cond);
 	free(ch.internal, ch.allocator);
@@ -85,8 +75,8 @@ channel_destroy :: proc(ch: $C/Channel($T)) {
 channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) {
 	mutex_lock(&ch.mutex);
 
-	if !ch.is_closed {
-		ch.is_closed = true;
+	if !ch.closed {
+		ch.closed = true;
 		condition_broadcast(&ch.r_cond);
 		condition_broadcast(&ch.w_cond);
 		ok = true;
@@ -99,45 +89,25 @@ channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) {
 channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
-	// fmt.println("channel_write");
-	// defer fmt.println("channel_write done");
 
-	if ch.is_closed {
+	if ch.closed {
 		return;
 	}
 
-	for !channel_can_write(ch) {
+
+	for len(ch.queue) == cap(ch.queue) {
 		ch.w_waiting += 1;
 		condition_wait_for(&ch.w_cond);
 		ch.w_waiting -= 1;
 	}
 
-	if ch.is_buffered {
-		if len(ch.queue) < cap(ch.queue) {
-			append(&ch.queue, msg);
-			ok = true;
-		}
-
-		if ch.r_waiting > 0 {
-			condition_signal(&ch.r_cond);
-		}
-	} else {
-		for len(ch.queue) == cap(ch.queue) {
-			ch.w_waiting += 1;
-			condition_wait_for(&ch.w_cond);
-			ch.w_waiting -= 1;
-		}
-		assert(len(ch.queue) < cap(ch.queue));
+	if len(ch.queue) < cap(ch.queue) {
 		append(&ch.queue, msg);
 		ok = true;
-		assert(ch.w_waiting >= 0);
-		ch.w_waiting += 1;
-
-		if ch.r_waiting > 0 {
-			condition_signal(&ch.r_cond);
-		}
+	}
 
-		condition_wait_for(&ch.w_cond);
+	if ch.r_waiting > 0 {
+		condition_signal(&ch.r_cond);
 	}
 
 	return;
@@ -146,41 +116,27 @@ channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) {
 channel_read :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) #optional_ok {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
-	// fmt.println("channel_read");
-	// defer fmt.println("channel_read done");
 
-	if ch.is_closed {
-		return;
-	}
-	for !channel_can_read(ch) {
+	for len(ch.queue) == 0 {
+		if ch.closed {
+			return;
+		}
+
 		ch.r_waiting += 1;
 		condition_wait_for(&ch.r_cond);
 		ch.r_waiting -= 1;
 	}
-	if ch.is_closed {
-		return;
-	}
 
-	if ch.is_buffered {
-		assert(len(ch.queue) > 0);
-		msg, ok = pop_front_safe(&ch.queue);
-
-		if ch.w_waiting > 0 {
-			condition_signal(&ch.w_cond);
-		}
-	} else {
-		assert(ch.w_waiting > 0);
-		assert(len(ch.queue) > 0);
-		msg, ok = pop_front_safe(&ch.queue);
+	msg, ok = pop_front(&ch.queue);
 
-		ch.w_waiting -= 1;
+	if ch.w_waiting > 0 {
 		condition_signal(&ch.w_cond);
 	}
 
 	return;
 }
 
-channel_len :: proc(ch: $C/Channel($T)) -> (size: int) {
+channel_size :: proc(ch: $C/Channel($T)) -> (size: int) {
 	if channel_is_buffered(ch) {
 		mutex_lock(&ch.mutex);
 		size = len(ch.queue);
@@ -191,56 +147,111 @@ channel_len :: proc(ch: $C/Channel($T)) -> (size: int) {
 
 channel_is_closed :: proc(ch: $C/Channel($T)) -> bool {
 	mutex_lock(&ch.mutex);
-	closed := ch.is_closed;
+	closed := ch.closed;
 	mutex_unlock(&ch.mutex);
 	return closed;
 }
 
 channel_is_buffered :: proc(ch: $C/Channel($T)) -> bool {
-	return ch.is_buffered;
+	q := transmute(mem.Raw_Dynamic_Array)ch.queue;
+	return q.cap != 0 && (q.data != &ch.unbuffered_msg);
 }
 
 channel_can_write :: proc(ch: $C/Channel($T)) -> bool {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
-	if ch.is_closed {
-		return false;
-	}
-	if ch.is_buffered {
-		return len(ch.queue) < cap(ch.queue);
-	}
-	return ch.r_waiting > 0;
+	return len(ch.queue) < cap(ch.queue);
 }
 
 channel_can_read :: proc(ch: $C/Channel($T)) -> bool {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
-	if ch.is_buffered {
-		return len(ch.queue) > 0;
-	}
-	return ch.w_waiting > 0;
+	return len(ch.queue) > 0;
 }
 
 channel_can_read_write :: proc(ch: $C/Channel($T)) -> bool {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
-	if ch.is_buffered {
-		return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue);
-	}
-	return ch.r_waiting > 0 && ch.w_waiting > 0;
+	return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue);
 }
 
 channel_iterator :: proc(ch: $C/Channel($T)) -> (elem: T, ok: bool) {
 	mutex_lock(&ch.mutex);
 	defer mutex_unlock(&ch.mutex);
 
-	if ch.is_buffered {
-		if len(ch.queue) > 0 {
-			return channel_read(ch);
-		}
-	} else if ch.w_waiting > 0 {
+	if len(ch.queue) > 0 {
 		return channel_read(ch);
 	}
 
 	return T{}, false;
 }
+
+
+
+channel_select :: proc(readers, writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
+	Candidate :: struct {
+		ch:    C,
+		msg:   T,
+		index: int,
+		read:  bool,
+	};
+
+	count := 0;
+	candidates := make([]Candidate, len(readers) + len(writers));
+	defer delete(candidates);
+
+	for c, i in readers {
+		if channel_can_read(c) {
+			candidates[count] = {
+				ch = c,
+				index = i,
+				read = true,
+			};
+			count += 1;
+		}
+	}
+
+	for c, i in writers {
+		if channel_can_write(c) {
+			candidates[count] = {
+				ch = c,
+				index = count,
+				read = false,
+				msg = write_msgs[i],
+			};
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		return T{}, -1;
+	}
+
+	// Randomize the input
+	r := rand.create(time.read_cycle_counter());
+	s := candidates[rand.int_max(count, &r)];
+	if s.read {
+		ok: bool;
+		if read_msg, ok = channel_read(s.ch); !ok {
+			index = -1;
+			return;
+		}
+	} else {
+		if !channel_write(s.ch, s.msg) {
+			index = -1;
+			return;
+		}
+	}
+
+	index = s.index;
+	return;
+}
+
+
+channel_select_write :: proc(writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
+	return channel_select([]C{}, writers, msg);
+}
+channel_select_read :: proc(readers: []$C/Channel($T)) -> (index: int) {
+	_, index = channel_select(readers, []C{}, nil);
+	return;
+}