Browse Source

Update `sync.Condition` to require a `^sync.Mutex` on init

gingerBill 5 years ago
parent
commit
9495e3d10c
2 changed files with 69 additions and 35 deletions
  1. 31 19
      core/sync/sync_unix.odin
  2. 38 16
      core/sync/sync_windows.odin

+ 31 - 19
core/sync/sync_unix.odin

@@ -12,6 +12,7 @@ Mutex :: struct {
 // one thread.
 // one thread.
 Condition :: struct {
 Condition :: struct {
 	handle: unix.pthread_cond_t,
 	handle: unix.pthread_cond_t,
+	mutex:  ^Mutex,
 
 
 	// NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
 	// NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
 	// This means that you may signal the condition before anyone is waiting to cause the
 	// This means that you may signal the condition before anyone is waiting to cause the
@@ -20,8 +21,6 @@ Condition :: struct {
 	// but not one that is about to wait, which can cause your program to become out of sync in
 	// but not one that is about to wait, which can cause your program to become out of sync in
 	// ways that are hard to debug or fix.
 	// ways that are hard to debug or fix.
 	flag: bool, // atomically mutated
 	flag: bool, // atomically mutated
-
-	mutex: Mutex,
 }
 }
 
 
 
 
@@ -54,46 +53,59 @@ mutex_unlock :: proc(m: ^Mutex) {
 }
 }
 
 
 
 
-condition_init :: proc(c: ^Condition) {
+condition_init :: proc(c: ^Condition, mutex: ^Mutex) -> bool {
 	// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition.
 	// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition.
 	attrs: unix.pthread_condattr_t;
 	attrs: unix.pthread_condattr_t;
-	assert(unix.pthread_condattr_init(&attrs) == 0);
+	if unix.pthread_condattr_init(&attrs) != 0 {
+		return false;
+	}
 	defer unix.pthread_condattr_destroy(&attrs); // ignores destruction error
 	defer unix.pthread_condattr_destroy(&attrs); // ignores destruction error
 
 
-	assert(unix.pthread_cond_init(&c.handle, &attrs) == 0);
-
-	mutex_init(&c.mutex);
 	c.flag = false;
 	c.flag = false;
+	c.mutex = mutex;
+	return unix.pthread_cond_init(&c.handle, &attrs) == 0;
 }
 }
 
 
 condition_destroy :: proc(c: ^Condition) {
 condition_destroy :: proc(c: ^Condition) {
 	assert(unix.pthread_cond_destroy(&c.handle) == 0);
 	assert(unix.pthread_cond_destroy(&c.handle) == 0);
-	mutex_destroy(&c.mutex);
 	c.handle = {};
 	c.handle = {};
 }
 }
 
 
-// Awaken exactly one thread who is waiting on the condition.
-condition_signal :: proc(c: ^Condition) {
-	mutex_lock(&c.mutex);
-	defer mutex_unlock(&c.mutex);
+// Awaken exactly one thread who is waiting on the condition
+condition_signal :: proc(c: ^Condition) -> bool {
+	mutex_lock(c.mutex);
+	defer mutex_unlock(c.mutex);
 	atomic_swap(&c.flag, true, .Sequentially_Consistent);
 	atomic_swap(&c.flag, true, .Sequentially_Consistent);
-	assert(unix.pthread_cond_signal(&c.handle) == 0);
+	return unix.pthread_cond_signal(&c.handle) == 0;
+}
+
+// Awaken all threads who are waiting on the condition
+condition_broadcast :: proc(c: ^Condition) -> bool {
+	return pthread_cond_broadcast(&c.handle) == 0;
 }
 }
 
 
 // Wait for the condition to be signalled.
 // Wait for the condition to be signalled.
 // Does not block if the condition has been signalled and no one
 // Does not block if the condition has been signalled and no one
 // has waited on it yet.
 // has waited on it yet.
-condition_wait_for :: proc(c: ^Condition) {
-	mutex_lock(&c.mutex);
-	defer mutex_unlock(&c.mutex);
+condition_wait_for :: proc(c: ^Condition) -> bool {
+	mutex_lock(c.mutex);
+	defer mutex_unlock(c.mutex);
 	// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
 	// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
 	// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
 	// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
 	// back to sleep.
 	// back to sleep.
 	// Though this overall behavior is the most sane, there may be a better way to do this that means that
 	// Though this overall behavior is the most sane, there may be a better way to do this that means that
 	// the first thread to wait, gets the flag first.
 	// the first thread to wait, gets the flag first.
-	if atomic_swap(&c.flag, false, .Sequentially_Consistent) do return;
+	if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
+		return true;
+	}
 	for {
 	for {
-		assert(unix.pthread_cond_wait(&c.handle, &c.mutex.handle) == 0);
-		if atomic_swap(&c.flag, false, .Sequentially_Consistent) do break;
+		if unix.pthread_cond_wait(&c.handle, &c.mutex.handle) != 0 {
+			return false;
+		}
+		if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
+			return true;
+		}
 	}
 	}
+
+	return false;
 }
 }

+ 38 - 16
core/sync/sync_windows.odin

@@ -3,15 +3,20 @@ package sync
 
 
 import "core:sys/win32"
 import "core:sys/win32"
 
 
+foreign import kernel32 "system:kernel32.lib"
+
 // A lock that can only be held by one thread at once.
 // A lock that can only be held by one thread at once.
 Mutex :: struct {
 Mutex :: struct {
 	_critical_section: win32.Critical_Section,
 	_critical_section: win32.Critical_Section,
 }
 }
 
 
+
 // Blocks until signalled.
 // Blocks until signalled.
 // When signalled, awakens exactly one waiting thread.
 // When signalled, awakens exactly one waiting thread.
 Condition :: struct {
 Condition :: struct {
-	event: win32.Handle,
+	_handle: WIN32_CONDITION_VARIABLE,
+
+	mutex: ^Mutex,
 }
 }
 
 
 // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
 // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
@@ -60,27 +65,44 @@ mutex_unlock :: proc(m: ^Mutex) {
 	win32.leave_critical_section(&m._critical_section);
 	win32.leave_critical_section(&m._critical_section);
 }
 }
 
 
+@private WIN32_CONDITION_VARIABLE :: distinct rawptr;
+@private
+foreign kernel32 {
+	InitializeConditionVariable :: proc(ConditionVariable: ^WIN32_CONDITION_VARIABLE) ---
+	WakeConditionVariable :: proc(ConditionVariable: ^WIN32_CONDITION_VARIABLE) ---
+	WakeAllConditionVariable :: proc(ConditionVariable: ^WIN32_CONDITION_VARIABLE) ---
+	SleepConditionVariableCS :: proc(ConditionVariable: ^WIN32_CONDITION_VARIABLE, CriticalSection: ^win32.Critical_Section, dwMilliseconds: u32) -> b32 ---
+}
 
 
-condition_init :: proc(using c: ^Condition) {
-	// create an auto-reset event.
-	// NOTE(tetra, 2019-10-30): this will, when signalled, signal exactly one waiting thread
-	// and then reset itself automatically.
-	event = win32.create_event_w(nil, false, false, nil);
-	assert(event != nil);
+condition_init :: proc(c: ^Condition, mutex: ^Mutex) -> bool {
+	assert(mutex != nil);
+	InitializeConditionVariable(&c._handle);
+	c.mutex = mutex;
+	return c._handle != nil;
 }
 }
 
 
-condition_destroy :: proc(using c: ^Condition) {
-	if event != nil {
-		win32.close_handle(event);
+condition_destroy :: proc(c: ^Condition) {
+	if c._handle != nil {
+		WakeAllConditionVariable(&c._handle);
 	}
 	}
 }
 }
 
 
-condition_signal :: proc(using c: ^Condition) {
-	ok := win32.set_event(event);
-	assert(bool(ok));
+condition_signal :: proc(c: ^Condition) -> bool {
+	if c._handle == nil {
+		return false;
+	}
+	WakeConditionVariable(&c._handle);
+	return true;
 }
 }
 
 
-condition_wait_for :: proc(using c: ^Condition) {
-	result := win32.wait_for_single_object(event, win32.INFINITE);
-	assert(result != win32.WAIT_FAILED);
+condition_broadcast :: proc(c: ^Condition) -> bool {
+	if c._handle == nil {
+		return false;
+	}
+	WakeAllConditionVariable(&c._handle);
+	return true;
+}
+
+condition_wait_for :: proc(c: ^Condition) -> bool {
+	return cast(bool)SleepConditionVariableCS(&c._handle, &c.mutex._critical_section, win32.INFINITE);
 }
 }