Browse Source

Minor changes to sync/chan (HIGHLY EXPERIMENTAL)

gingerBill 1 year ago
parent
commit
656de10ba4
1 changed files with 122 additions and 34 deletions
  1. 122 34
      core/sync/chan/chan.odin

+ 122 - 34
core/sync/chan/chan.odin

@@ -7,12 +7,6 @@ import "core:mem"
 import "core:sync"
 import "core:sync"
 import "core:math/rand"
 import "core:math/rand"
 
 
-_ :: runtime
-_ :: mem
-_ :: sync
-
-
-
 Direction :: enum {
 Direction :: enum {
 	Send = -1,
 	Send = -1,
 	Both =  0,
 	Both =  0,
@@ -20,29 +14,28 @@ Direction :: enum {
 }
 }
 
 
 Chan :: struct($T: typeid, $D: Direction = Direction.Both) {
 Chan :: struct($T: typeid, $D: Direction = Direction.Both) {
-	#subtype impl: ^Raw_Chan,
+	#subtype impl: ^Raw_Chan `fmt:"-"`,
 }
 }
 
 
 Raw_Chan :: struct {
 Raw_Chan :: struct {
-	allocator: runtime.Allocator,
+	// Shared
+	allocator:       runtime.Allocator,
 	allocation_size: int,
 	allocation_size: int,
+	msg_size:        u16,
+	closed:          b16, // atomic
+	mutex:           sync.Mutex,
+	r_cond:          sync.Cond,
+	w_cond:          sync.Cond,
+	r_waiting:       int,  // atomic
+	w_waiting:       int,  // atomic
 
 
 	// Buffered
 	// Buffered
 	queue: ^Raw_Queue,
 	queue: ^Raw_Queue,
 
 
 	// Unbuffered
 	// Unbuffered
-	r_mutex: sync.Mutex,
-	w_mutex: sync.Mutex,
+	r_mutex:         sync.Mutex,
+	w_mutex:         sync.Mutex,
 	unbuffered_data: rawptr,
 	unbuffered_data: rawptr,
-	msg_size:        int,
-
-	// Shared
-	mutex:     sync.Mutex,
-	r_cond:    sync.Cond,
-	w_cond:    sync.Cond,
-	closed:    bool, // atomic
-	r_waiting: int,  // atomic
-	w_waiting: int,  // atomic
 }
 }
 
 
 
 
@@ -52,13 +45,15 @@ create :: proc{
 }
 }
 
 
 @(require_results)
 @(require_results)
-create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) {
+create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error)
+	where size_of(T) <= int(max(u16)) {
 	c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator)
 	c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator)
 	return
 	return
 }
 }
 
 
 @(require_results)
 @(require_results)
-create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) {
+create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error)
+	where size_of(T) <= int(max(u16)) {
 	c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator)
 	c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator)
 	return
 	return
 }
 }
@@ -70,6 +65,7 @@ create_raw :: proc{
 
 
 @(require_results)
 @(require_results)
 create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
 create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
+	assert(msg_size <= int(max(u16)))
 	align := max(align_of(Raw_Chan), msg_alignment)
 	align := max(align_of(Raw_Chan), msg_alignment)
 
 
 	size := mem.align_forward_int(size_of(Raw_Chan), align)
 	size := mem.align_forward_int(size_of(Raw_Chan), align)
@@ -81,12 +77,13 @@ create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator:
 	c = (^Raw_Chan)(ptr)
 	c = (^Raw_Chan)(ptr)
 	c.allocation_size = size
 	c.allocation_size = size
 	c.unbuffered_data = ([^]byte)(ptr)[offset:]
 	c.unbuffered_data = ([^]byte)(ptr)[offset:]
-	c.msg_size = msg_size
+	c.msg_size = u16(msg_size)
 	return
 	return
 }
 }
 
 
 @(require_results)
 @(require_results)
 create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
 create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
+	assert(msg_size <= int(max(u16)))
 	if cap <= 0 {
 	if cap <= 0 {
 		return create_raw_unbuffered(msg_size, msg_alignment, allocator)
 		return create_raw_unbuffered(msg_size, msg_alignment, allocator)
 	}
 	}
@@ -97,7 +94,7 @@ create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap:
 	q_offset := size
 	q_offset := size
 	size = mem.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment)
 	size = mem.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment)
 	offset := size
 	offset := size
-	size += msg_size * (cap+1)
+	size += msg_size * cap
 	size = mem.align_forward_int(size, align)
 	size = mem.align_forward_int(size, align)
 
 
 	ptr := mem.alloc(size, align, allocator) or_return
 	ptr := mem.alloc(size, align, allocator) or_return
@@ -107,20 +104,18 @@ create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap:
 	bptr := ([^]byte)(ptr)
 	bptr := ([^]byte)(ptr)
 
 
 	c.queue = (^Raw_Queue)(bptr[q_offset:])
 	c.queue = (^Raw_Queue)(bptr[q_offset:])
-	c.msg_size = msg_size
+	c.msg_size = u16(msg_size)
 
 
-	items := ([^]byte)(bptr[offset:])
-	c.unbuffered_data = items
-	raw_queue_init(c.queue, items[msg_size:], cap, msg_size)
+	raw_queue_init(c.queue, ([^]byte)(bptr[offset:]), cap, msg_size)
 	return
 	return
 }
 }
 
 
-destroy :: proc(c: ^Raw_Chan) -> runtime.Allocator_Error {
+destroy :: proc(c: ^Raw_Chan) -> (err: runtime.Allocator_Error) {
 	if c != nil {
 	if c != nil {
 		allocator := c.allocator
 		allocator := c.allocator
-		return mem.free_with_size(c, c.allocation_size, allocator)
+		err = mem.free_with_size(c, c.allocation_size, allocator)
 	}
 	}
-	return nil
+	return
 }
 }
 
 
 @(require_results)
 @(require_results)
@@ -139,6 +134,13 @@ send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D
 	return
 	return
 }
 }
 
 
+@(require_results)
+try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both {
+	data := data
+	ok = try_send_raw(c, &data)
+	return
+}
+
 @(require_results)
 @(require_results)
 recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
 recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
 	ok = recv_raw(c, &data)
 	ok = recv_raw(c, &data)
@@ -146,6 +148,13 @@ recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >=
 }
 }
 
 
 
 
+@(require_results)
+try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
+	ok = try_recv_raw(c, &data)
+	return
+}
+
+
 @(require_results)
 @(require_results)
 send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
 send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
 	if c == nil {
 	if c == nil {
@@ -171,7 +180,7 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
 			return false
 			return false
 		}
 		}
 
 
-		mem.copy(c.unbuffered_data, msg_in, c.msg_size)
+		mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
 		sync.atomic_add(&c.w_waiting, 1)
 		sync.atomic_add(&c.w_waiting, 1)
 		if sync.atomic_load(&c.r_waiting) > 0 {
 		if sync.atomic_load(&c.r_waiting) > 0 {
 			sync.signal(&c.r_cond)
 			sync.signal(&c.r_cond)
@@ -201,7 +210,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
 
 
 		msg := raw_queue_pop(c.queue)
 		msg := raw_queue_pop(c.queue)
 		if msg != nil {
 		if msg != nil {
-			mem.copy(msg_out, msg, c.msg_size)
+			mem.copy(msg_out, msg, int(c.msg_size))
 		}
 		}
 
 
 		if sync.atomic_load(&c.w_waiting) > 0 {
 		if sync.atomic_load(&c.w_waiting) > 0 {
@@ -223,7 +232,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
 			return
 			return
 		}
 		}
 
 
-		mem.copy(msg_out, c.unbuffered_data, c.msg_size)
+		mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
 		sync.atomic_sub(&c.w_waiting, 1)
 		sync.atomic_sub(&c.w_waiting, 1)
 
 
 		sync.signal(&c.w_cond)
 		sync.signal(&c.w_cond)
@@ -233,11 +242,90 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
 }
 }
 
 
 
 
+@(require_results)
+try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
+	if c == nil {
+		return false
+	}
+	if c.queue != nil { // buffered
+		sync.guard(&c.mutex)
+		if c.queue.len == c.queue.cap {
+			return false
+		}
+
+		ok = raw_queue_push(c.queue, msg_in)
+		if sync.atomic_load(&c.r_waiting) > 0 {
+			sync.signal(&c.r_cond)
+		}
+	} else if c.unbuffered_data != nil { // unbuffered
+		sync.guard(&c.w_mutex)
+		sync.guard(&c.mutex)
+
+		if sync.atomic_load(&c.closed) {
+			return false
+		}
+
+		mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
+		sync.atomic_add(&c.w_waiting, 1)
+		if sync.atomic_load(&c.r_waiting) > 0 {
+			sync.signal(&c.r_cond)
+		}
+		sync.wait(&c.w_cond, &c.mutex)
+		ok = true
+	}
+	return
+}
+
+@(require_results)
+try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
+	if c == nil {
+		return false
+	}
+	if c.queue != nil { // buffered
+		sync.guard(&c.mutex)
+		if c.queue.len == 0 {
+			return false
+		}
+
+		msg := raw_queue_pop(c.queue)
+		if msg != nil {
+			mem.copy(msg_out, msg, int(c.msg_size))
+		}
+
+		if sync.atomic_load(&c.w_waiting) > 0 {
+			sync.signal(&c.w_cond)
+		}
+		return true
+	} else if c.unbuffered_data != nil { // unbuffered
+		sync.guard(&c.r_mutex)
+		sync.guard(&c.mutex)
+
+		if sync.atomic_load(&c.closed) ||
+		   sync.atomic_load(&c.w_waiting) == 0 {
+		   	return false
+		}
+
+		mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
+		sync.atomic_sub(&c.w_waiting, 1)
+
+		sync.signal(&c.w_cond)
+		return true
+	}
+	return false
+}
+
+
+
 @(require_results)
 @(require_results)
 is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
 is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
 	return c != nil && c.queue != nil
 	return c != nil && c.queue != nil
 }
 }
 
 
+@(require_results)
+is_unbuffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
+	return c != nil && c.unbuffered_data != nil
+}
+
 @(require_results)
 @(require_results)
 len :: proc "contextless" (c: ^Raw_Chan) -> int {
 len :: proc "contextless" (c: ^Raw_Chan) -> int {
 	if c != nil && c.queue != nil {
 	if c != nil && c.queue != nil {
@@ -276,7 +364,7 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
 		return true
 		return true
 	}
 	}
 	sync.guard(&c.mutex)
 	sync.guard(&c.mutex)
-	return sync.atomic_load(&c.closed)
+	return bool(sync.atomic_load(&c.closed))
 }
 }