|
@@ -0,0 +1,401 @@
|
|
|
|
+package sync_chan
|
|
|
|
+
|
|
|
|
+import "base:builtin"
|
|
|
|
+import "base:intrinsics"
|
|
|
|
+import "base:runtime"
|
|
|
|
+import "core:mem"
|
|
|
|
+import "core:sync"
|
|
|
|
+import "core:math/rand"
|
|
|
|
+
|
|
|
|
+_ :: runtime
|
|
|
|
+_ :: mem
|
|
|
|
+_ :: sync
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+Direction :: enum {
|
|
|
|
+ Send = -1,
|
|
|
|
+ Both = 0,
|
|
|
|
+ Recv = +1,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+Chan :: struct($T: typeid, $D: Direction = Direction.Both) {
|
|
|
|
+ #subtype impl: ^Raw_Chan,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+Raw_Chan :: struct {
|
|
|
|
+ allocator: runtime.Allocator,
|
|
|
|
+ allocation_size: int,
|
|
|
|
+
|
|
|
|
+ // Buffered
|
|
|
|
+ queue: ^Raw_Queue,
|
|
|
|
+
|
|
|
|
+ // Unbuffered
|
|
|
|
+ r_mutex: sync.Mutex,
|
|
|
|
+ w_mutex: sync.Mutex,
|
|
|
|
+ unbuffered_data: rawptr,
|
|
|
|
+ msg_size: int,
|
|
|
|
+
|
|
|
|
+ // Shared
|
|
|
|
+ mutex: sync.Mutex,
|
|
|
|
+ r_cond: sync.Cond,
|
|
|
|
+ w_cond: sync.Cond,
|
|
|
|
+ closed: bool, // atomic
|
|
|
|
+ r_waiting: int, // atomic
|
|
|
|
+ w_waiting: int, // atomic
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+create :: proc{
|
|
|
|
+ create_unbuffered,
|
|
|
|
+ create_buffered,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) {
|
|
|
|
+ c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator)
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) {
|
|
|
|
+ c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator)
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+create_raw :: proc{
|
|
|
|
+ create_raw_unbuffered,
|
|
|
|
+ create_raw_buffered,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
|
|
|
|
+ align := max(align_of(Raw_Chan), msg_alignment)
|
|
|
|
+
|
|
|
|
+ size := mem.align_forward_int(size_of(Raw_Chan), align)
|
|
|
|
+ offset := size
|
|
|
|
+ size += msg_size
|
|
|
|
+ size = mem.align_forward_int(size, align)
|
|
|
|
+
|
|
|
|
+ ptr := mem.alloc(size, align, allocator) or_return
|
|
|
|
+ c = (^Raw_Chan)(ptr)
|
|
|
|
+ c.allocation_size = size
|
|
|
|
+ c.unbuffered_data = ([^]byte)(ptr)[offset:]
|
|
|
|
+ c.msg_size = msg_size
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
|
|
|
|
+ if cap <= 0 {
|
|
|
|
+ return create_raw_unbuffered(msg_size, msg_alignment, allocator)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ align := max(align_of(Raw_Chan), msg_alignment, align_of(Raw_Queue))
|
|
|
|
+
|
|
|
|
+ size := mem.align_forward_int(size_of(Raw_Chan), align)
|
|
|
|
+ q_offset := size
|
|
|
|
+ size = mem.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment)
|
|
|
|
+ offset := size
|
|
|
|
+ size += msg_size * (cap+1)
|
|
|
|
+ size = mem.align_forward_int(size, align)
|
|
|
|
+
|
|
|
|
+ ptr := mem.alloc(size, align, allocator) or_return
|
|
|
|
+ c = (^Raw_Chan)(ptr)
|
|
|
|
+ c.allocation_size = size
|
|
|
|
+
|
|
|
|
+ bptr := ([^]byte)(ptr)
|
|
|
|
+
|
|
|
|
+ c.queue = (^Raw_Queue)(bptr[q_offset:])
|
|
|
|
+ c.msg_size = msg_size
|
|
|
|
+
|
|
|
|
+ items := ([^]byte)(bptr[offset:])
|
|
|
|
+ c.unbuffered_data = items
|
|
|
|
+ raw_queue_init(c.queue, items[msg_size:], cap, msg_size)
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+destroy :: proc(c: ^Raw_Chan) -> runtime.Allocator_Error {
|
|
|
|
+ if c != nil {
|
|
|
|
+ allocator := c.allocator
|
|
|
|
+ return mem.free_with_size(c, c.allocation_size, allocator)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+as_send :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (s: Chan(T, .Send)) where C.D <= .Both {
|
|
|
|
+ return transmute(type_of(s))c
|
|
|
|
+}
|
|
|
|
+@(require_results)
|
|
|
|
+as_recv :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (r: Chan(T, .Recv)) where C.D >= .Both {
|
|
|
|
+ return transmute(type_of(r))c
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both {
|
|
|
|
+ data := data
|
|
|
|
+ ok = send_raw(c, &data)
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
|
|
|
|
+ ok = recv_raw(c, &data)
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
|
|
|
|
+ if c == nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ if c.queue != nil { // buffered
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ for c.queue.len == c.queue.cap {
|
|
|
|
+ sync.atomic_add(&c.w_waiting, 1)
|
|
|
|
+ sync.wait(&c.w_cond, &c.mutex)
|
|
|
|
+ sync.atomic_sub(&c.w_waiting, 1)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ok = raw_queue_push(c.queue, msg_in)
|
|
|
|
+ if sync.atomic_load(&c.r_waiting) > 0 {
|
|
|
|
+ sync.signal(&c.r_cond)
|
|
|
|
+ }
|
|
|
|
+ } else if c.unbuffered_data != nil { // unbuffered
|
|
|
|
+ sync.guard(&c.w_mutex)
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+
|
|
|
|
+ if sync.atomic_load(&c.closed) {
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ mem.copy(c.unbuffered_data, msg_in, c.msg_size)
|
|
|
|
+ sync.atomic_add(&c.w_waiting, 1)
|
|
|
|
+ if sync.atomic_load(&c.r_waiting) > 0 {
|
|
|
|
+ sync.signal(&c.r_cond)
|
|
|
|
+ }
|
|
|
|
+ sync.wait(&c.w_cond, &c.mutex)
|
|
|
|
+ ok = true
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
|
|
|
|
+ if c == nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ if c.queue != nil { // buffered
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ for c.queue.len == 0 {
|
|
|
|
+ if sync.atomic_load(&c.closed) {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sync.atomic_add(&c.r_waiting, 1)
|
|
|
|
+ sync.wait(&c.r_cond, &c.mutex)
|
|
|
|
+ sync.atomic_sub(&c.r_waiting, 1)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ msg := raw_queue_pop(c.queue)
|
|
|
|
+ if msg != nil {
|
|
|
|
+ mem.copy(msg_out, msg, c.msg_size)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if sync.atomic_load(&c.w_waiting) > 0 {
|
|
|
|
+ sync.signal(&c.w_cond)
|
|
|
|
+ }
|
|
|
|
+ ok = true
|
|
|
|
+ } else if c.unbuffered_data != nil { // unbuffered
|
|
|
|
+ sync.guard(&c.r_mutex)
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+
|
|
|
|
+ for !sync.atomic_load(&c.closed) &&
|
|
|
|
+ sync.atomic_load(&c.w_waiting) == 0 {
|
|
|
|
+ sync.atomic_add(&c.r_waiting, 1)
|
|
|
|
+ sync.wait(&c.r_cond, &c.mutex)
|
|
|
|
+ sync.atomic_sub(&c.r_waiting, 1)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if sync.atomic_load(&c.closed) {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ mem.copy(msg_out, c.unbuffered_data, c.msg_size)
|
|
|
|
+ sync.atomic_sub(&c.w_waiting, 1)
|
|
|
|
+
|
|
|
|
+ sync.signal(&c.w_cond)
|
|
|
|
+ ok = true
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
|
|
|
|
+ return c != nil && c.queue != nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+len :: proc "contextless" (c: ^Raw_Chan) -> int {
|
|
|
|
+ if c != nil && c.queue != nil {
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return c.queue.len
|
|
|
|
+ }
|
|
|
|
+ return 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+cap :: proc "contextless" (c: ^Raw_Chan) -> int {
|
|
|
|
+ if c != nil && c.queue != nil {
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return c.queue.cap
|
|
|
|
+ }
|
|
|
|
+ return 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+close :: proc "contextless" (c: ^Raw_Chan) -> bool {
|
|
|
|
+ if c == nil {
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ if sync.atomic_load(&c.closed) {
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ sync.atomic_store(&c.closed, true)
|
|
|
|
+ sync.broadcast(&c.r_cond)
|
|
|
|
+ sync.broadcast(&c.w_cond)
|
|
|
|
+ return true
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
|
|
|
|
+ if c == nil {
|
|
|
|
+ return true
|
|
|
|
+ }
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return sync.atomic_load(&c.closed)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+Raw_Queue :: struct {
|
|
|
|
+ data: [^]byte,
|
|
|
|
+ len: int,
|
|
|
|
+ cap: int,
|
|
|
|
+ next: int,
|
|
|
|
+ size: int, // element size
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+raw_queue_init :: proc "contextless" (q: ^Raw_Queue, data: rawptr, cap: int, size: int) {
|
|
|
|
+ q.data = ([^]byte)(data)
|
|
|
|
+ q.len = 0
|
|
|
|
+ q.cap = cap
|
|
|
|
+ q.next = 0
|
|
|
|
+ q.size = size
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+raw_queue_push :: proc "contextless" (q: ^Raw_Queue, data: rawptr) -> bool {
|
|
|
|
+ if q.len == q.cap {
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ pos := q.next + q.len
|
|
|
|
+ if pos >= q.cap {
|
|
|
|
+ pos -= q.cap
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ val_ptr := q.data[pos*q.size:]
|
|
|
|
+ mem.copy(val_ptr, data, q.size)
|
|
|
|
+ q.len += 1
|
|
|
|
+ return true
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) {
|
|
|
|
+ if q.len > 0 {
|
|
|
|
+ data = q.data[q.next*q.size:]
|
|
|
|
+ q.next += 1
|
|
|
|
+ q.len -= 1
|
|
|
|
+ if q.next >= q.cap {
|
|
|
|
+ q.next -= q.cap
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
|
|
|
|
+ if is_buffered(c) {
|
|
|
|
+ return len(c) > 0
|
|
|
|
+ }
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return sync.atomic_load(&c.w_waiting) > 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
|
|
|
|
+ if is_buffered(c) {
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return len(c) < cap(c)
|
|
|
|
+ }
|
|
|
|
+ sync.guard(&c.mutex)
|
|
|
|
+ return sync.atomic_load(&c.r_waiting) > 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@(require_results)
|
|
|
|
+select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, ok: bool) #no_bounds_check {
|
|
|
|
+ Select_Op :: struct {
|
|
|
|
+ idx: int, // local to the slice that was given
|
|
|
|
+ is_recv: bool,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ candidate_count := builtin.len(recvs)+builtin.len(sends)
|
|
|
|
+ candidates := ([^]Select_Op)(intrinsics.alloca(candidate_count*size_of(Select_Op), align_of(Select_Op)))
|
|
|
|
+ count := 0
|
|
|
|
+
|
|
|
|
+ for c, i in recvs {
|
|
|
|
+ if can_recv(c) {
|
|
|
|
+ candidates[count] = {
|
|
|
|
+ is_recv = true,
|
|
|
|
+ idx = i,
|
|
|
|
+ }
|
|
|
|
+ count += 1
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for c, i in sends {
|
|
|
|
+ if can_send(c) {
|
|
|
|
+ candidates[count] = {
|
|
|
|
+ is_recv = false,
|
|
|
|
+ idx = i,
|
|
|
|
+ }
|
|
|
|
+ count += 1
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if count == 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ r: ^rand.Rand = nil
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ select_idx = rand.int_max(count, r) if count > 0 else 0
|
|
|
|
+
|
|
|
|
+ sel := candidates[select_idx]
|
|
|
|
+ if sel.is_recv {
|
|
|
|
+ ok = recv_raw(recvs[sel.idx], recv_out)
|
|
|
|
+ } else {
|
|
|
|
+ ok = send_raw(sends[sel.idx], send_msgs[sel.idx])
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+}
|