浏览代码

Add `sync.Wait_Group`

gingerBill 5 年之前
父节点
当前提交
3a1492fc99
共有 2 个文件被更改,包括 87 次插入5 次删除
  1. 29 5
      core/sync/channel.odin
  2. 58 0
      core/sync/wait_group.odin

+ 29 - 5
core/sync/channel.odin

@@ -1,6 +1,5 @@
 package sync
 package sync
 
 
-// import "core:fmt"
 import "core:mem"
 import "core:mem"
 import "core:time"
 import "core:time"
 import "core:intrinsics"
 import "core:intrinsics"
@@ -113,17 +112,32 @@ channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
 }
 }
 
 
 
 
-channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, ok: bool) {
+channel_iterator :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) {
 	c := ch._internal;
 	c := ch._internal;
 	if c == nil {
 	if c == nil {
 		return;
 		return;
 	}
 	}
 
 
 	if !c.closed || c.len > 0 {
 	if !c.closed || c.len > 0 {
-		val, ok = channel_recv(ch), true;
+		msg, ok = channel_recv(ch), true;
 	}
 	}
 	return;
 	return;
 }
 }
+channel_drain :: proc(ch: $C/Channel($T)) {
+	raw_channel_drain(ch._internal);
+}
+
+
+channel_move :: proc(dst, src: $C/Channel($T)) {
+	// for channel_len(src) > 0 {
+	// 	msg := channel_recv(src);
+	// 	channel_send(dst, msg);
+	// }
+	for msg in channel_iterator(src) {
+		channel_send(dst, msg);
+	}
+}
+
 
 
 
 
 channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
 channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
@@ -247,8 +261,6 @@ channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: i
 
 
 
 
 
 
-
-
 Raw_Channel :: struct {
 Raw_Channel :: struct {
 	data:        rawptr,
 	data:        rawptr,
 	elem_size:   int,
 	elem_size:   int,
@@ -393,3 +405,15 @@ raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
 	mutex_unlock(&c.mutex);
 	mutex_unlock(&c.mutex);
 	return;
 	return;
 }
 }
+
+
+raw_channel_drain :: proc(c: ^Raw_Channel) {
+	if c == nil {
+		return;
+	}
+	mutex_lock(&c.mutex);
+	c.len   = 0;
+	c.read  = 0;
+	c.write = 0;
+	mutex_unlock(&c.mutex);
+}

+ 58 - 0
core/sync/wait_group.odin

@@ -0,0 +1,58 @@
+package sync
+
+import "intrinsics"
+
+Wait_Group :: struct {
+	counter: int,
+	mutex:   Blocking_Mutex,
+	cond:    Condition,
+}
+
+wait_group_init :: proc(wg: ^Wait_Group) {
+	wg.counter = 0;
+	blocking_mutex_init(&wg.mutex);
+	condition_init(&wg.cond, &wg.mutex);
+}
+
+
+wait_group_destroy :: proc(wg: ^Wait_Group) {
+	condition_destroy(&wg.cond);
+	blocking_mutex_destroy(&wg.mutex);
+}
+
+wait_group_add :: proc(wg: ^Wait_Group, delta: int) {
+	if delta == 0 {
+		return;
+	}
+
+	blocking_mutex_lock(&wg.mutex);
+	defer blocking_mutex_unlock(&wg.mutex);
+
+	intrinsics.atomic_add(&wg.counter, delta);
+	if wg.counter < 0 {
+		panic("sync.Wait_Group negative counter");
+	}
+	if wg.counter == 0 {
+		condition_broadcast(&wg.cond);
+		if wg.counter != 0 {
+			panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+		}
+	}
+}
+
+wait_group_done :: proc(wg: ^Wait_Group) {
+	wait_group_add(wg, -1);
+}
+
+wait_group_wait :: proc(wg: ^Wait_Group) {
+	blocking_mutex_lock(&wg.mutex);
+	defer blocking_mutex_unlock(&wg.mutex);
+
+	if wg.counter != 0 {
+		condition_wait_for(&wg.cond);
+		if wg.counter != 0 {
+			panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+		}
+	}
+}
+