Browse Source

Update package sync

gingerBill 5 years ago
parent
commit
6d032e6f1a
5 changed files with 606 additions and 147 deletions
  1. 591 144
      core/sync/channel.odin
  2. 5 0
      core/sync/sync_unix.odin
  3. 8 2
      core/sync/sync_windows.odin
  4. 1 0
      core/sys/windows/types.odin
  5. 1 1
      core/thread/thread.odin

+ 591 - 144
core/sync/channel.odin

@@ -1,6 +1,7 @@
 package sync
 
 import "core:mem"
+import "core:fmt"
 import "core:time"
 import "core:intrinsics"
 import "core:math/rand"
@@ -64,6 +65,15 @@ channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T
 	}
 	return;
 }
+channel_try_recv_ptr :: proc(ch: $C/Channel($T), msg: ^T, loc := #caller_location) -> (ok: bool) {
+	res: T;
+	res, ok = channel_try_recv(ch, loc);
+	if ok && msg != nil {
+		msg^ = res;
+	}
+	return;
+}
+
 
 channel_is_nil :: proc(ch: $C/Channel($T)) -> bool {
 	return ch._internal == nil;
@@ -90,7 +100,6 @@ channel_can_recv :: proc(ch: $C/Channel($T)) -> (ok: bool) {
 }
 
 
-
 channel_peek :: proc(ch: $C/Channel($T)) -> int {
 	c := ch._internal;
 	if c == nil {
@@ -104,11 +113,7 @@ channel_peek :: proc(ch: $C/Channel($T)) -> int {
 
 
 channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
-	c := ch._internal;
-	if c == nil {
-		panic(message="cannot close nil channel", loc=loc);
-	}
-	intrinsics.atomic_store(&c.closed, true);
+	raw_channel_close(ch._internal, loc);
 }
 
 
@@ -129,152 +134,51 @@ channel_drain :: proc(ch: $C/Channel($T)) {
 
 
 channel_move :: proc(dst, src: $C/Channel($T)) {
-	// for channel_len(src) > 0 {
-	// 	msg := channel_recv(src);
-	// 	channel_send(dst, msg);
-	// }
 	for msg in channel_iterator(src) {
 		channel_send(dst, msg);
 	}
 }
 
 
-
-channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
-	backing: [64]int;
-	candidates := backing[:];
-	cap := len(channels);
-	if cap > len(backing) {
-		candidates = make([]int, cap, context.temp_allocator);
-	} else {
-		candidates = candidates[:cap];
-	}
-
-	count := u32(0);
-	for c, i in channels {
-		if raw_channel_can_recv(c) {
-			candidates[i] = i;
-			count += 1;
-		}
-	}
-
-	if count == 0 {
-		index = -1;
-		return;
-	}
-
-	t := time.now();
-	r := rand.create(transmute(u64)t);
-	i := rand.uint32(&r);
-
-	index = candidates[i % count];
-	return;
+Raw_Channel_Wait_Queue :: struct {
+	next: ^Raw_Channel_Wait_Queue,
+	state: ^uintptr,
 }
 
 
-channel_select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
-	backing: [64]int;
-	candidates := backing[:];
-	if len(channels) > len(backing) {
-		candidates = make([]int, len(channels), context.temp_allocator);
-	}
-
-	count := u32(0);
-	for c, i in channels {
-		if raw_channel_can_send(c) {
-			candidates[i] = i;
-			count += 1;
-		}
-	}
-
-	if count == 0 {
-		index = -1;
-		return;
-	}
-
-	t := time.now();
-	r := rand.create(transmute(u64)t);
-	i := rand.uint32(&r);
+Raw_Channel :: struct {
+	closed:      bool,
+	ready:       bool, // ready to recv
+	data_offset: u16,  // data is stored at the end of this data structure
+	elem_size:   u32,
+	len, cap:    int,
+	read, write: int,
+	mutex:       Mutex,
+	cond:        Condition,
+	allocator:   mem.Allocator,
 
-	index = candidates[i % count];
-	return;
+	sendq: ^Raw_Channel_Wait_Queue,
+	recvq: ^Raw_Channel_Wait_Queue,
 }
 
-channel_select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) {
-	backing: [64]int;
-	candidates := backing[:];
-	if len(channels) > len(backing) {
-		candidates = make([]int, len(channels), context.temp_allocator);
-	}
-
-	count := u32(0);
-	for c, i in channels {
-		if channel_can_recv(c) {
-			candidates[i] = i;
-			count += 1;
-		}
-	}
-
-	if count == 0 {
-		index = -1;
-		return;
-	}
-
-	t := time.now();
-	r := rand.create(transmute(u64)t);
-	i := rand.uint32(&r);
-
-	index = candidates[i % count];
-	msg = channel_recv(channels[index]);
-	return;
+raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
+	val.next = head^;
+	head^ = val;
 }
-
-channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) {
-	backing: [64]int;
-	candidates := backing[:];
-	if len(channels) > len(backing) {
-		candidates = make([]int, len(channels), context.temp_allocator);
-	}
-
-	count := u32(0);
-	for c, i in channels {
-		if raw_channel_can_send(c) {
-			candidates[i] = i;
-			count += 1;
-		}
+raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
+	p := head;
+	for p^ != nil && p^ != val {
+		p = &p^.next;
 	}
-
-	if count == 0 {
-		index = -1;
-		return;
+	if p != nil {
+		p^ = p^.next;
 	}
-
-	t := time.now();
-	r := rand.create(transmute(u64)t);
-	i := rand.uint32(&r);
-
-	index = candidates[i % count];
-	channel_send(channels[index], msg);
-	return;
 }
 
 
+raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel {
+	assert(int(u32(elem_size)) == elem_size);
 
-
-Raw_Channel :: struct {
-	data:        rawptr,
-	elem_size:   int,
-	len, cap:    int,
-	read, write: int,
-	mutex:       Mutex,
-	cond:        Condition,
-	allocator:   mem.Allocator,
-	closed:      bool,
-	ready:       bool, // ready to recv
-}
-
-
-raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel {
 	s := size_of(Raw_Channel);
 	s = mem.align_forward_int(s, elem_align);
 	data_offset := uintptr(s);
@@ -287,8 +191,8 @@ raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel {
 		return nil;
 	}
 
-	c.data = rawptr(uintptr(c) + data_offset);
-	c.elem_size = elem_size;
+	c.data_offset = u16(data_offset);
+	c.elem_size = u32(elem_size);
 	c.len, c.cap = 0, max(cap, 0);
 	c.read, c.write = 0, 0;
 	mutex_init(&c.mutex);
@@ -305,18 +209,34 @@ raw_channel_destroy :: proc(c: ^Raw_Channel) {
 		return;
 	}
 	context.allocator = c.allocator;
-	c.closed = true;
+	intrinsics.atomic_store(&c.closed, true);
 
 	condition_destroy(&c.cond);
 	mutex_destroy(&c.mutex);
 	free(c);
 }
 
+raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) {
+	if c == nil {
+		panic(message="cannot close nil channel", loc=loc);
+	}
+	mutex_lock(&c.mutex);
+	defer mutex_unlock(&c.mutex);
+	intrinsics.atomic_store(&c.closed, true);
+
+	// Release readers and writers
+	raw_channel_wait_queue_broadcast(c.recvq);
+	raw_channel_wait_queue_broadcast(c.sendq);
+	condition_broadcast(&c.cond);
+}
+
+
 
 raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool {
 	send :: proc(c: ^Raw_Channel, src: rawptr) {
-		dst := uintptr(c.data) + uintptr(c.write * c.elem_size);
-		mem.copy(rawptr(dst), src, c.elem_size);
+		data := uintptr(c) + uintptr(c.data_offset);
+		dst := data + uintptr(c.write * int(c.elem_size));
+		mem.copy(rawptr(dst), src, int(c.elem_size));
 		c.len += 1;
 		c.write = (c.write + 1) % max(c.cap, 1);
 	}
@@ -329,9 +249,10 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc :=
 	}
 
 	mutex_lock(&c.mutex);
+	defer mutex_unlock(&c.mutex);
+
 	if c.cap > 0 {
 		if !block && c.len >= c.cap {
-			mutex_unlock(&c.mutex);
 			return false;
 		}
 
@@ -339,12 +260,16 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc :=
 			condition_wait_for(&c.cond);
 		}
 	} else if c.len > 0 {
+		if !block {
+			return false;
+		}
 		condition_wait_for(&c.cond);
 	}
 
 	send(c, msg);
-	mutex_unlock(&c.mutex);
 	condition_signal(&c.cond);
+	raw_channel_wait_queue_signal(c.recvq);
+
 
 	return true;
 }
@@ -355,8 +280,10 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat
 			panic(message="cannot recv message; channel is empty", loc=loc);
 		}
 		c.len -= 1;
-		src := uintptr(c.data) + uintptr(c.read * c.elem_size);
-		mem.copy(dst, rawptr(src), c.elem_size);
+
+		data := uintptr(c) + uintptr(c.data_offset);
+		src := data + uintptr(c.read * int(c.elem_size));
+		mem.copy(dst, rawptr(src), int(c.elem_size));
 		c.read = (c.read + 1) % max(c.cap, 1);
 	}
 
@@ -365,6 +292,7 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat
 	}
 	intrinsics.atomic_store(&c.ready, true);
 	for c.len < 1 {
+		raw_channel_wait_queue_signal(c.sendq);
 		condition_wait_for(&c.cond);
 	}
 	intrinsics.atomic_store(&c.ready, false);
@@ -389,9 +317,9 @@ raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
 	case c.closed:
 		ok = false;
 	case c.cap > 0:
-		ok = c.len < c.cap;
+		ok = c.ready && c.len < c.cap;
 	case:
-		ok = !c.ready;
+		ok = c.ready && c.len == 0;
 	}
 	mutex_unlock(&c.mutex);
 	return;
@@ -417,3 +345,522 @@ raw_channel_drain :: proc(c: ^Raw_Channel) {
 	c.write = 0;
 	mutex_unlock(&c.mutex);
 }
+
+
+
+MAX_SELECT_CHANNELS :: 64;
+SELECT_MAX_TIMEOUT :: max(time.Duration);
+
+Select_Command :: enum {
+	Recv,
+	Send,
+}
+
+Select_Channel :: struct {
+	channel: ^Raw_Channel,
+	command: Select_Command,
+}
+
+
+
+select :: proc(channels: ..Select_Channel) -> (index: int) {
+	return select_timeout(SELECT_MAX_TIMEOUT, ..channels);
+}
+select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+	backing: [MAX_SELECT_CHANNELS]int;
+	queues:  [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+	candidates := backing[:];
+	cap := len(channels);
+	candidates = candidates[:cap];
+
+	nil_channel := Raw_Channel{closed = true};
+
+	count := u32(0);
+	for c, i in channels {
+		if c.channel == nil {
+			continue;
+		}
+		switch c.command {
+		case .Recv:
+			if raw_channel_can_recv(c.channel) {
+				candidates[count] = i;
+				count += 1;
+			}
+		case .Send:
+			if raw_channel_can_send(c.channel) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+	}
+
+	if count == 0 {
+		wait_state: uintptr = 0;
+		for c, i in channels {
+			q := &queues[i];
+			q.state = &wait_state;
+		}
+
+		for c, i in channels {
+			if c.channel == nil {
+				continue;
+			}
+			q := &queues[i];
+			switch c.command {
+			case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q);
+			case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q);
+			}
+		}
+		raw_channel_wait_queue_wait_on(&wait_state, timeout);
+		for c, i in channels {
+			if c.channel == nil {
+				continue;
+			}
+			q := &queues[i];
+			switch c.command {
+			case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q);
+			case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q);
+			}
+		}
+
+		for c, i in channels {
+			switch c.command {
+			case .Recv:
+				if raw_channel_can_recv(c.channel) {
+					candidates[count] = i;
+					count += 1;
+				}
+			case .Send:
+				if raw_channel_can_send(c.channel) {
+					candidates[count] = i;
+					count += 1;
+				}
+			}
+		}
+		if count == 0 && timeout == SELECT_MAX_TIMEOUT {
+			index = -1;
+			return;
+		}
+
+		assert(count != 0);
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+	backing: [MAX_SELECT_CHANNELS]int;
+	queues:  [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+	candidates := backing[:];
+	cap := len(channels);
+	candidates = candidates[:cap];
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_recv(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		state: uintptr;
+		for c, i in channels {
+			q := &queues[i];
+			q.state = &state;
+			raw_channel_wait_queue_insert(&c.recvq, q);
+		}
+		raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+		for c, i in channels {
+			q := &queues[i];
+			raw_channel_wait_queue_remove(&c.recvq, q);
+		}
+
+		for c, i in channels {
+			if raw_channel_can_recv(c) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+		assert(count != 0);
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+	queues:  [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+	candidates: [MAX_SELECT_CHANNELS]int;
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_recv(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		state: uintptr;
+		for c, i in channels {
+			q := &queues[i];
+			q.state = &state;
+			raw_channel_wait_queue_insert(&c.recvq, q);
+		}
+		raw_channel_wait_queue_wait_on(&state);
+		for c, i in channels {
+			q := &queues[i];
+			raw_channel_wait_queue_remove(&c.recvq, q);
+		}
+
+		for c, i in channels {
+			if raw_channel_can_recv(c) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+		assert(count != 0);
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	msg = channel_recv(channels[index]);
+
+	return;
+}
+
+select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+	backing: [MAX_SELECT_CHANNELS]int;
+	queues:  [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+	candidates := backing[:];
+	cap := len(channels);
+	candidates = candidates[:cap];
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_recv(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		state: uintptr;
+		for c, i in channels {
+			q := &queues[i];
+			q.state = &state;
+			raw_channel_wait_queue_insert(&c.recvq, q);
+		}
+		raw_channel_wait_queue_wait_on(&state);
+		for c, i in channels {
+			q := &queues[i];
+			raw_channel_wait_queue_remove(&c.recvq, q);
+		}
+
+		for c, i in channels {
+			if raw_channel_can_recv(c) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+		assert(count != 0);
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+
+	if msg != nil {
+		channel_send(channels[index], msg);
+	}
+
+	return;
+}
+
+select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+	candidates: [MAX_SELECT_CHANNELS]int;
+	queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_send(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		state: uintptr;
+		for c, i in channels {
+			q := &queues[i];
+			q.state = &state;
+			raw_channel_wait_queue_insert(&c.sendq, q);
+		}
+		raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+		for c, i in channels {
+			q := &queues[i];
+			raw_channel_wait_queue_remove(&c.sendq, q);
+		}
+
+		for c, i in channels {
+			if raw_channel_can_send(c) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+		assert(count != 0);
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+select_try :: proc(channels: ..Select_Channel) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		panic("sync: select with no channels");
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+	backing: [MAX_SELECT_CHANNELS]int;
+	queues:  [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+	candidates := backing[:];
+	cap := len(channels);
+	candidates = candidates[:cap];
+
+	count := u32(0);
+	for c, i in channels {
+		switch c.command {
+		case .Recv:
+			if raw_channel_can_recv(c.channel) {
+				candidates[count] = i;
+				count += 1;
+			}
+		case .Send:
+			if raw_channel_can_send(c.channel) {
+				candidates[count] = i;
+				count += 1;
+			}
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+
+select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		index = -1;
+		return;
+	case 1:
+		index = -1;
+		if raw_channel_can_recv(channels[0]) {
+			index = 0;
+		}
+		return;
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+	candidates: [MAX_SELECT_CHANNELS]int;
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_recv(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+
+select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check {
+	switch len(channels) {
+	case 0:
+		return -1;
+	case 1:
+		if raw_channel_can_send(channels[0]) {
+			return 0;
+		}
+		return -1;
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+	candidates: [MAX_SELECT_CHANNELS]int;
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_send(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	return;
+}
+
+select_try_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) {
+	switch len(channels) {
+	case 0:
+		index = 0;
+		return;
+	case 1:
+		if c := channels[0]; channel_can_recv(c) {
+			index = 0;
+			msg = channel_recv(c);
+			return;
+		}
+		return;
+	}
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+	candidates: [MAX_SELECT_CHANNELS]int;
+
+	count := u32(0);
+	for c, i in channels {
+		if channel_can_recv(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	msg = channel_recv(channels[index]);
+	return;
+}
+
+select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) {
+	switch len(channels) {
+	case 0:
+		index = 0;
+		return;
+	case 1:
+		if c := channels[0]; channel_can_send(c) {
+			index = 0;
+			channel_send(c, msg);
+			return;
+		}
+		return;
+	}
+
+
+	assert(len(channels) <= MAX_SELECT_CHANNELS);
+	candidates: [MAX_SELECT_CHANNELS]int;
+
+	count := u32(0);
+	for c, i in channels {
+		if raw_channel_can_send(c) {
+			candidates[count] = i;
+			count += 1;
+		}
+	}
+
+	if count == 0 {
+		index = -1;
+		return;
+	}
+
+	t := time.now();
+	r := rand.create(transmute(u64)t);
+	i := rand.uint32(&r);
+
+	index = candidates[i % count];
+	channel_send(channels[index], msg);
+	return;
+}
+

+ 5 - 0
core/sync/sync_unix.odin

@@ -241,3 +241,8 @@ condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bo
 	return false;
 }
 
+
+
+thread_yield :: proc() {
+	unix.sched_yield();
+}

+ 8 - 2
core/sync/sync_windows.odin

@@ -61,7 +61,7 @@ Blocking_Mutex :: struct {
 
 
 blocking_mutex_init :: proc(m: ^Blocking_Mutex) {
-	m^ = Blocking_Mutex{};
+	win32.InitializeSRWLock(&m._handle);
 }
 
 blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) {
@@ -127,7 +127,7 @@ condition_wait_for :: proc(c: ^Condition) -> bool {
 	return false;
 }
 condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool {
-	ms := win32.DWORD((time.duration_nanoseconds(duration) + 999999)/1000000);
+	ms := win32.DWORD((max(time.duration_nanoseconds(duration), 0) + 999999)/1000000);
 	switch m in &c.mutex {
 	case ^Mutex:
 		return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, ms);
@@ -168,3 +168,9 @@ rw_lock_read_unlock :: proc(l: ^RW_Lock) {
 rw_lock_write_unlock :: proc(l: ^RW_Lock) {
 	win32.ReleaseSRWLockExclusive(&l._handle);
 }
+
+
+thread_yield :: proc() {
+	win32.SwitchToThread();
+}
+

+ 1 - 0
core/sys/windows/types.odin

@@ -46,6 +46,7 @@ LPOVERLAPPED :: ^OVERLAPPED;
 LPPROCESS_INFORMATION :: ^PROCESS_INFORMATION;
 LPSECURITY_ATTRIBUTES :: ^SECURITY_ATTRIBUTES;
 LPSTARTUPINFO :: ^STARTUPINFO;
+PVOID  :: rawptr;
 LPVOID :: rawptr;
 LPWCH :: ^WCHAR;
 LPWIN32_FIND_DATAW :: ^WIN32_FIND_DATAW;

+ 1 - 1
core/thread/thread.odin

@@ -31,7 +31,7 @@ run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority :=
 }
 
 
-run_with_data :: proc(fn: proc(data: rawptr), data: rawptr, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) {
+run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) {
 	thread_proc :: proc(t: ^Thread) {
 		fn := cast(proc(rawptr))t.data;
 		data := rawptr(uintptr(t.user_index));