Selaa lähdekoodia

Update sync.Channel

gingerBill 5 vuotta sitten
vanhempi
commit
fc65aee307
1 muutettua tiedostoa jossa 288 lisäystä ja 193 poistoa
  1. 288 193
      core/sync/channel.odin

+ 288 - 193
core/sync/channel.odin

@@ -2,290 +2,385 @@ package sync
 
 import "core:mem"
 import "core:time"
+import "core:fmt"
+import "core:intrinsics"
 import "core:math/rand"
 
 _, _ :: time, rand;
 
-chan :: struct(T: typeid) {
-	qlen: uint,
-	qcap: uint,
-	closed: b32,
-	sendx: uint,
-	recvx: uint,
-	mutex: Blocking_Mutex,
-	allocator: mem.Allocator,
 
-	buf: [0]T,
+Channel :: struct(T: typeid) {
+	using _internal: ^Raw_Channel,
 }
 
-makechan :: proc($T: typeid, cap: int, allocator := context.allocator) -> ^chan(T) {
-	chan_size :: size_of(chan(T));
-	chan_align :: align_of(chan(T));
+channel_init :: proc(ch: ^$C/Channel($T), cap := 0, allocator := context.allocator) {
+	context.allocator = allocator;
+	ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+	return;
+}
 
-	mem := uintptr(cap) * size_of(T);
-	c := cast(^chan(T))mem.alloc(chan_size+mem, chan_align, allocator);
-	c.allocator = allocator;
-	c.qlen = 0;
-	c.qcap = uint(cap);
-	blocking_mutex_init(&c.mutex);
-	return c;
+channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T)) {
+	context.allocator = allocator;
+	ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+	return;
 }
-chanbuf :: proc(c: ^$C/chan($T)) -> []T #no_bounds_check {
-	return c.buf[0:c.qcap];
+
+channel_destroy :: proc(ch: $C/Channel($T)) {
+	raw_channel_destroy(ch._internal);
 }
 
 
+channel_len :: proc(ch: $C/Channel($T)) -> int {
+	return ch._internal.len;
+}
+channel_cap :: proc(ch: $C/Channel($T)) -> int {
+	return ch._internal.cap;
+}
 
 
-/*
+channel_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) {
+	msg := msg;
+	_ = raw_channel_send_impl(ch._internal, &msg, false, loc);
+}
+channel_try_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) -> bool {
+	msg := msg;
+	return raw_channel_send_impl(ch._internal, &msg, true, loc);
+}
 
-Channel :: struct(T: typeid) {
-	using internal: ^_Channel_Internal(T),
+channel_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T) {
+	c := ch._internal;
+	mutex_lock(&c.mutex);
+	raw_channel_recv_impl(c, &msg, loc);
+	mutex_unlock(&c.mutex);
+	return;
+}
+channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T, ok: bool) {
+	c := ch._internal;
+	if mutex_try_lock(&c.mutex) {
+		if c.len > 0 {
+			raw_channel_recv_impl(c, &msg, loc);
+			ok = true;
+		}
+		mutex_unlock(&c.mutex);
+	}
+	return;
 }
 
-_Channel_Internal :: struct(T: typeid) {
-	allocator: mem.Allocator,
+channel_is_nil :: proc(ch: $C/Channel($T)) -> bool {
+	return ch._internal == nil;
+}
 
-	queue: [dynamic]T,
 
-	unbuffered_msg: T, // Will be used as the backing to the queue if no `cap` is given
+channel_eq :: proc(a, b: $C/Channel($T)) -> bool {
+	return a._internal == b._internal;
+}
+channel_ne :: proc(a, b: $C/Channel($T)) -> bool {
+	return a._internal != b._internal;
+}
 
-	mutex:  Mutex,
-	r_cond: Condition,
-	w_cond: Condition,
 
-	closed:    bool,
-	r_waiting: int,
-	w_waiting: int,
+channel_can_send :: proc(ch: $C/Channel($T)) -> (ok: bool) {
+	return raw_channel_can_send(ch._internal);
 }
-
-channel_init :: proc(c: ^$C/Channel($T), cap: int = 0, allocator := context.allocator) {
-	c^ = cast(C)channel_make(T, cap, allocator);
+channel_can_recv :: proc(ch: $C/Channel($T)) -> (ok: bool) {
+	return raw_channel_can_recv(ch._internal);
 }
 
-channel_make :: proc($T: typeid, cap: int = 0, allocator := context.allocator) -> (ch: Channel(T)) {
-	ch.internal = new(_Channel_Internal(T), allocator);
-	if ch.internal == nil {
-		return {};
+
+
+channel_peek :: proc(ch: $C/Channel($T)) -> int {
+	c := ch._internal;
+	if c == nil {
+		return -1;
 	}
-	ch.allocator = allocator;
-
-	mutex_init(&ch.mutex);
-	condition_init(&ch.r_cond, &ch.mutex);
-	condition_init(&ch.w_cond, &ch.mutex);
-	ch.closed = false;
-	ch.r_waiting = 0;
-	ch.w_waiting = 0;
-	ch.unbuffered_msg = T{};
-
-	if cap > 0 {
-		ch.queue = make([dynamic]T, 0, cap, ch.allocator);
-	} else {
-		d := mem.Raw_Dynamic_Array{
-			data = &ch.unbuffered_msg,
-			len  = 0,
-			cap  = 1,
-			allocator = mem.nil_allocator(),
-		};
-		ch.queue = transmute([dynamic]T)d;
+	if intrinsics.atomic_load(&c.closed) {
+		return -1;
 	}
-	return ch;
+	return intrinsics.atomic_load(&c.len);
 }
 
-channel_destroy :: proc(ch: $C/Channel($T)) {
-	channel_close(ch);
 
-	if channel_is_buffered(ch) {
-		delete(ch.queue);
+channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
+	c := ch._internal;
+	if c == nil {
+		panic(message="cannot close nil channel", loc=loc);
 	}
-
-	mutex_destroy(&ch.mutex);
-	condition_destroy(&ch.r_cond);
-	condition_destroy(&ch.w_cond);
-	free(ch.internal, ch.allocator);
+	intrinsics.atomic_store(&c.closed, true);
 }
 
-channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) {
-	mutex_lock(&ch.mutex);
 
-	if !ch.closed {
-		ch.closed = true;
-		condition_broadcast(&ch.r_cond);
-		condition_broadcast(&ch.w_cond);
-		ok = true;
+channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) {
+	c := ch._internal;
+	switch {
+	case c == nil:
+		return;
+	case intrinsics.atomic_load(&c.closed):
+		if channel_can_recv(ch) {
+			val = channel_recv(ch);
+			open = true;
+		}
+	case:
+		val = channel_recv(ch);
+		open = true;
 	}
-
-	mutex_unlock(&ch.mutex);
 	return;
 }
 
-channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
 
-	if ch.closed {
+channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+	backing: [64]int;
+	candidates := backing[:];
+	if len(channels) > len(backing) {
+		candidates = make([]int, len(channels), context.temp_allocator);
+	}
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_recv(c) {
+			candidates[i] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
 		return;
 	}
 
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
 
-	for len(ch.queue) == cap(ch.queue) {
-		ch.w_waiting += 1;
-		condition_wait_for(&ch.w_cond);
-		ch.w_waiting -= 1;
+	index = candidates[i % count];
+	return;
+}
+
+
+channel_select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
+	backing: [64]int;
+	candidates := backing[:];
+	if len(channels) > len(backing) {
+		candidates = make([]int, len(channels), context.temp_allocator);
 	}
 
-	if len(ch.queue) < cap(ch.queue) {
-		append(&ch.queue, msg);
-		ok = true;
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_send(c) {
+			candidates[i] = i;
+			count += 1;
+		}
 	}
 
-	if ch.r_waiting > 0 {
-		condition_signal(&ch.r_cond);
+	if count == 0 {
+		index = -1;
+		return;
 	}
 
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
 	return;
 }
 
-channel_read :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) #optional_ok {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
+channel_select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) {
+	backing: [64]int;
+	candidates := backing[:];
+	if len(channels) > len(backing) {
+		candidates = make([]int, len(channels), context.temp_allocator);
+	}
 
-	for len(ch.queue) == 0 {
-		if ch.closed {
-			return;
+	count := u32(0);
+	for c, i in channels {
+		if channel_can_recv(c) {
+			candidates[i] = i;
+			count += 1;
 		}
-
-		ch.r_waiting += 1;
-		condition_wait_for(&ch.r_cond);
-		ch.r_waiting -= 1;
 	}
 
-	msg, ok = pop_front(&ch.queue);
-
-	if ch.w_waiting > 0 {
-		condition_signal(&ch.w_cond);
+	if count == 0 {
+		index = -1;
+		return;
 	}
 
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	msg = channel_recv(channels[index]);
 	return;
 }
 
-channel_size :: proc(ch: $C/Channel($T)) -> (size: int) {
-	if channel_is_buffered(ch) {
-		mutex_lock(&ch.mutex);
-		size = len(ch.queue);
-		mutex_unlock(&ch.mutex);
+channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) {
+	backing: [64]int;
+	candidates := backing[:];
+	if len(channels) > len(backing) {
+		candidates = make([]int, len(channels), context.temp_allocator);
+	}
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_send(c) {
+			candidates[i] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
 	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	channel_send(channels[index], msg);
 	return;
 }
 
-channel_is_closed :: proc(ch: $C/Channel($T)) -> bool {
-	mutex_lock(&ch.mutex);
-	closed := ch.closed;
-	mutex_unlock(&ch.mutex);
-	return closed;
-}
 
-channel_is_buffered :: proc(ch: $C/Channel($T)) -> bool {
-	q := transmute(mem.Raw_Dynamic_Array)ch.queue;
-	return q.cap != 0 && (q.data != &ch.unbuffered_msg);
-}
 
-channel_can_write :: proc(ch: $C/Channel($T)) -> bool {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
-	return len(ch.queue) < cap(ch.queue);
-}
 
-channel_can_read :: proc(ch: $C/Channel($T)) -> bool {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
-	return len(ch.queue) > 0;
-}
 
-channel_can_read_write :: proc(ch: $C/Channel($T)) -> bool {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
-	return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue);
+
+Raw_Channel :: struct {
+	data:        rawptr,
+	elem_size:   int,
+	len, cap:    int,
+	read, write: int,
+	mutex:       Mutex,
+	cond:        Condition,
+	allocator:   mem.Allocator,
+	closed:      bool,
+	ready:       bool, // ready to recv
 }
 
-channel_iterator :: proc(ch: $C/Channel($T)) -> (elem: T, ok: bool) {
-	mutex_lock(&ch.mutex);
-	defer mutex_unlock(&ch.mutex);
 
-	if len(ch.queue) > 0 {
-		return channel_read(ch);
+raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel {
+	s := size_of(Raw_Channel);
+	s = mem.align_forward_int(s, elem_align);
+	data_offset := uintptr(s);
+	s += elem_size * max(cap, 1);
+
+	a := max(elem_align, align_of(Raw_Channel));
+
+	c := (^Raw_Channel)(mem.alloc(s, a));
+	if c == nil {
+		return nil;
 	}
 
-	return T{}, false;
+	c.data = rawptr(uintptr(c) + data_offset);
+	c.elem_size = elem_size;
+	c.len, c.cap = 0, max(cap, 0);
+	c.read, c.write = 0, 0;
+	mutex_init(&c.mutex);
+	condition_init(&c.cond, &c.mutex);
+	c.allocator = context.allocator;
+	c.closed = false;
+
+	return c;
 }
 
 
+raw_channel_destroy :: proc(c: ^Raw_Channel) {
+	if c == nil {
+		return;
+	}
+	context.allocator = c.allocator;
+	c.closed = true;
 
-channel_select :: proc(readers, writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
-	Candidate :: struct {
-		ch:    C,
-		msg:   T,
-		index: int,
-		read:  bool,
-	};
+	condition_destroy(&c.cond);
+	mutex_destroy(&c.mutex);
+	free(c);
+}
 
-	count := 0;
-	candidates := make([]Candidate, len(readers) + len(writers));
-	defer delete(candidates);
 
-	for c, i in readers {
-		if channel_can_read(c) {
-			candidates[count] = {
-				ch = c,
-				index = i,
-				read = true,
-			};
-			count += 1;
-		}
+raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc := #caller_location) -> bool {
+	send :: proc(c: ^Raw_Channel, src: rawptr) {
+		dst := uintptr(c.data) + uintptr(c.write * c.elem_size);
+		mem.copy(rawptr(dst), src, c.elem_size);
+		c.len += 1;
+		c.write = (c.write + 1) % max(c.cap, 1);
 	}
 
-	for c, i in writers {
-		if channel_can_write(c) {
-			candidates[count] = {
-				ch = c,
-				index = count,
-				read = false,
-				msg = write_msgs[i],
-			};
-			count += 1;
-		}
+	switch {
+	case c == nil:
+		panic(message="cannot send message; channel is nil", loc=loc);
+	case c.closed:
+		panic(message="cannot send message; channel is closed", loc=loc);
 	}
 
-	if count == 0 {
-		return T{}, -1;
-	}
+	mutex_lock(&c.mutex);
+	if c.cap > 0 {
+		if no_block && c.len >= c.cap {
+			mutex_unlock(&c.mutex);
+			return false;
+		}
 
-	// Randomize the input
-	r := rand.create(time.read_cycle_counter());
-	s := candidates[rand.int_max(count, &r)];
-	if s.read {
-		ok: bool;
-		if read_msg, ok = channel_read(s.ch); !ok {
-			index = -1;
-			return;
+		for c.len >= c.cap {
+			condition_wait_for(&c.cond);
 		}
-	} else {
-		if !channel_write(s.ch, s.msg) {
-			index = -1;
-			return;
+	}
+
+	send(c, msg);
+	mutex_unlock(&c.mutex);
+	condition_signal(&c.cond);
+
+	return true;
+}
+
+raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) {
+	recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) {
+		if c.len < 1 {
+			panic(message="cannot recv message; channel is empty", loc=loc);
 		}
+		c.len -= 1;
+		src := uintptr(c.data) + uintptr(c.read * c.elem_size);
+		mem.copy(dst, rawptr(src), c.elem_size);
+		c.read = (c.read + 1) % max(c.cap, 1);
 	}
 
-	index = s.index;
-	return;
+	if c == nil {
+		panic(message="cannot recv message; channel is nil", loc=loc);
+	}
+	intrinsics.atomic_store(&c.ready, true);
+	for c.len < 1 {
+		condition_wait_for(&c.cond);
+	}
+	intrinsics.atomic_store(&c.ready, false);
+	recv(c, res, loc);
+	if c.cap > 0 && c.len == c.cap - 1 {
+		condition_signal(&c.cond);
+	}
 }
 
 
-channel_select_write :: proc(writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
-	return channel_select([]C{}, writers, msg);
+raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
+	if c == nil {
+		return false;
+	}
+	mutex_lock(&c.mutex);
+	switch {
+	case c.closed:
+		ok = false;
+	case c.cap > 0:
+		ok = c.len < c.cap;
+	case:
+		ok = !c.ready;
+	}
+	mutex_unlock(&c.mutex);
+	return;
 }
-channel_select_read :: proc(readers: []$C/Channel($T)) -> (index: int) {
-	_, index = channel_select(readers, []C{}, nil);
+raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
+	if c == nil {
+		return false;
+	}
+	mutex_lock(&c.mutex);
+	ok = c.len > 0;
+	mutex_unlock(&c.mutex);
 	return;
 }
-*/