Browse Source

Merge pull request #1746 from odin-lang/sync-cond-futex

`core:sync` Improvements
gingerBill 3 years ago
parent
commit
b758c696f2

+ 1 - 0
core/intrinsics/intrinsics.odin

@@ -141,6 +141,7 @@ type_is_valid_matrix_elements :: proc($T: typeid) -> bool ---
 
 type_is_named            :: proc($T: typeid) -> bool ---
 type_is_pointer          :: proc($T: typeid) -> bool ---
+type_is_multi_pointer    :: proc($T: typeid) -> bool ---
 type_is_array            :: proc($T: typeid) -> bool ---
 type_is_enumerated_array :: proc($T: typeid) -> bool ---
 type_is_slice            :: proc($T: typeid) -> bool ---

+ 2 - 2
core/sync/atomic.odin

@@ -6,8 +6,8 @@ cpu_relax :: intrinsics.cpu_relax
 
 /*
 Atomic_Memory_Order :: enum {
-	Relaxed = 0,
-	Consume = 1,
+	Relaxed = 0, // Unordered
+	Consume = 1, // Monotonic
 	Acquire = 2,
 	Release = 3,
 	Acq_Rel = 4,

+ 61 - 10
core/sync/extended.odin

@@ -16,8 +16,7 @@ wait_group_add :: proc(wg: ^Wait_Group, delta: int) {
 		return
 	}
 
-	mutex_lock(&wg.mutex)
-	defer mutex_unlock(&wg.mutex)
+	guard(&wg.mutex)
 
 	atomic_add(&wg.counter, delta)
 	if wg.counter < 0 {
@@ -36,8 +35,7 @@ wait_group_done :: proc(wg: ^Wait_Group) {
 }
 
 wait_group_wait :: proc(wg: ^Wait_Group) {
-	mutex_lock(&wg.mutex)
-	defer mutex_unlock(&wg.mutex)
+	guard(&wg.mutex)
 
 	if wg.counter != 0 {
 		cond_wait(&wg.cond, &wg.mutex)
@@ -51,8 +49,7 @@ wait_group_wait_with_timeout :: proc(wg: ^Wait_Group, duration: time.Duration) -
 	if duration <= 0 {
 		return false
 	}
-	mutex_lock(&wg.mutex)
-	defer mutex_unlock(&wg.mutex)
+	guard(&wg.mutex)
 
 	if wg.counter != 0 {
 		if !cond_wait_with_timeout(&wg.cond, &wg.mutex, duration) {
@@ -119,8 +116,7 @@ barrier_init :: proc(b: ^Barrier, thread_count: int) {
 // Block the current thread until all threads have rendezvoused
 // Barrier can be reused after all threads rendezvoused once, and can be used continuously
 barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) {
-	mutex_lock(&b.mutex)
-	defer mutex_unlock(&b.mutex)
+	guard(&b.mutex)
 	local_gen := b.generation_id
 	b.index += 1
 	if b.index < b.thread_count {
@@ -289,8 +285,7 @@ Once :: struct {
 once_do :: proc(o: ^Once, fn: proc()) {
 	@(cold)
 	do_slow :: proc(o: ^Once, fn: proc()) {
-		mutex_lock(&o.m)
-		defer mutex_unlock(&o.m)
+		guard(&o.m)
 		if !o.done {
 			fn()
 			atomic_store_explicit(&o.done, true, .Release)
@@ -302,3 +297,59 @@ once_do :: proc(o: ^Once, fn: proc()) {
 		do_slow(o, fn)
 	}
 }
+
+
+
+// A Parker is an associated token which is initially not present:
+//     * The `park` procedure blocks the current thread unless or until the token
+//       is available, at which point the token is consumed.
+//     * The `park_with_timeout` procedures works the same as `park` but only
+//       blocks for the specified duration.
+//     * The `unpark` procedure automatically makes the token available if it
+//       was not already.
+Parker :: struct {
+	state: Futex,
+}
+
+// Blocks the current thread until the token is made available.
+//
+// Assumes this is only called by the thread that owns the Parker.
+park :: proc(p: ^Parker) {
+	EMPTY    :: 0
+	NOTIFIED :: 1
+	PARKED   :: max(u32)
+	if atomic_sub_explicit(&p.state, 1, .Acquire) == NOTIFIED {
+		return
+	}
+	for {
+		futex_wait(&p.state, PARKED)
+		if _, ok := atomic_compare_exchange_strong_explicit(&p.state, NOTIFIED, EMPTY, .Acquire, .Acquire); ok {
+			return
+		}
+	}
+}
+
+// Blocks the current thread until the token is made available, but only
+// for a limited duration.
+//
+// Assumes this is only called by the thread that owns the Parker
+park_with_timeout :: proc(p: ^Parker, duration: time.Duration) {
+	EMPTY    :: 0
+	NOTIFIED :: 1
+	PARKED   :: max(u32)
+	if atomic_sub_explicit(&p.state, 1, .Acquire) == NOTIFIED {
+		return
+	}
+	futex_wait_with_timeout(&p.state, PARKED, duration)
+	atomic_exchange_explicit(&p.state, EMPTY, .Acquire)
+}
+
+// Automatically makes thee token available if it was not already.
+unpark :: proc(p: ^Parker)  {
+	EMPTY    :: 0
+	NOTIFIED :: 1
+	PARKED   :: max(Futex)
+	if atomic_exchange_explicit(&p.state, NOTIFIED, .Release) == PARKED {
+		futex_signal(&p.state)
+	}
+}

+ 2 - 2
core/sync/primitives.odin

@@ -195,7 +195,7 @@ sema_wait_with_timeout :: proc(s: ^Sema, duration: time.Duration) -> bool {
 Futex :: distinct u32
 
 futex_wait :: proc(f: ^Futex, expected: u32) {
-	if u32(atomic_load(f)) != expected {
+	if u32(atomic_load_explicit(f, .Acquire)) != expected {
 		return
 	}
 	
@@ -204,7 +204,7 @@ futex_wait :: proc(f: ^Futex, expected: u32) {
 
 // returns true if the wait happened within the duration, false if it exceeded the time duration
 futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> bool {
-	if u32(atomic_load(f)) != expected {
+	if u32(atomic_load_explicit(f, .Acquire)) != expected {
 		return true
 	}
 	if duration <= 0 {

+ 13 - 93
core/sync/primitives_atomic.odin

@@ -281,118 +281,39 @@ atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
 
 
 
-
-@(private="file")
-Queue_Item :: struct {
-	next: ^Queue_Item,
-	futex: Futex,
-}
-
-@(private="file")
-queue_item_wait :: proc(item: ^Queue_Item) {
-	for atomic_load_explicit(&item.futex, .Acquire) == 0 {
-		futex_wait(&item.futex, 0)
-		cpu_relax()
-	}
-}
-@(private="file")
-queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) -> bool {
-	start := time.tick_now()
-	for atomic_load_explicit(&item.futex, .Acquire) == 0 {
-		remaining := duration - time.tick_since(start)
-		if remaining < 0 {
-			return false
-		}
-		if !futex_wait_with_timeout(&item.futex, 0, remaining) {
-			return false
-		}
-		cpu_relax()
-	}
-	return true
-}
-@(private="file")
-queue_item_signal :: proc(item: ^Queue_Item) {
-	atomic_store_explicit(&item.futex, 1, .Release)
-	futex_signal(&item.futex)
-}
-
-
 // Atomic_Cond implements a condition variable, a rendezvous point for threads
 // waiting for signalling the occurence of an event
 //
 // An Atomic_Cond must not be copied after first use
 Atomic_Cond :: struct {
-	queue_mutex: Atomic_Mutex,
-	queue_head:  ^Queue_Item,
-	pending:     bool,
+	state: Futex,
 }
 
 atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
-	waiter := &Queue_Item{}
-
-	atomic_mutex_lock(&c.queue_mutex)
-	waiter.next = c.queue_head
-	c.queue_head = waiter
+	state := u32(atomic_load_explicit(&c.state, .Relaxed))
+	unlock(m)
+	futex_wait(&c.state, state)
+	lock(m)
 
-	atomic_store(&c.pending, true)
-	atomic_mutex_unlock(&c.queue_mutex)
-
-	atomic_mutex_unlock(m)
-	queue_item_wait(waiter)
-	atomic_mutex_lock(m)
 }
 
 atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, duration: time.Duration) -> (ok: bool) {
-	waiter := &Queue_Item{}
-
-	atomic_mutex_lock(&c.queue_mutex)
-	waiter.next = c.queue_head
-	c.queue_head = waiter
-
-	atomic_store(&c.pending, true)
-	atomic_mutex_unlock(&c.queue_mutex)
-
-	atomic_mutex_unlock(m)
-	ok = queue_item_wait_with_timeout(waiter, duration)
-	atomic_mutex_lock(m)
+	state := u32(atomic_load_explicit(&c.state, .Relaxed))
+	unlock(m)
+	ok = futex_wait_with_timeout(&c.state, state, duration)
+	lock(m)
 	return
 }
 
 
 atomic_cond_signal :: proc(c: ^Atomic_Cond) {
-	if !atomic_load(&c.pending) {
-		return
-	}
-
-	atomic_mutex_lock(&c.queue_mutex)
-	waiter := c.queue_head
-	if c.queue_head != nil {
-		c.queue_head = c.queue_head.next
-	}
-	atomic_store(&c.pending, c.queue_head != nil)
-	atomic_mutex_unlock(&c.queue_mutex)
-
-	if waiter != nil {
-		queue_item_signal(waiter)
-	}
+	atomic_add_explicit(&c.state, 1, .Release)
+	futex_signal(&c.state)
 }
 
 atomic_cond_broadcast :: proc(c: ^Atomic_Cond) {
-	if !atomic_load(&c.pending) {
-		return
-	}
-
-	atomic_store(&c.pending, false)
-
-	atomic_mutex_lock(&c.queue_mutex)
-	waiters := c.queue_head
-	c.queue_head = nil
-	atomic_mutex_unlock(&c.queue_mutex)
-
-	for waiters != nil {
-		queue_item_signal(waiters)
-		waiters = waiters.next
-	}
+	atomic_add_explicit(&c.state, 1, .Release)
+	futex_broadcast(&c.state)
 }
 
 // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
@@ -430,7 +351,6 @@ atomic_sema_wait_with_timeout :: proc(s: ^Atomic_Sema, duration: time.Duration)
 		return false
 	}
 	for {
-
 		original_count := atomic_load_explicit(&s.count, .Relaxed)
 		for start := time.tick_now(); original_count == 0; /**/ {
 			remaining := duration - time.tick_since(start)

+ 0 - 39
core/sync/primitives_darwin.odin

@@ -3,7 +3,6 @@
 package sync
 
 import "core:c"
-import "core:time"
 import "core:intrinsics"
 
 foreign import pthread "System.framework"
@@ -17,41 +16,3 @@ _current_thread_id :: proc "contextless" () -> int {
 	pthread_threadid_np(nil, &tid)
 	return int(tid)
 }
-
-
-
-_Mutex :: struct {
-	mutex: Atomic_Mutex,
-}
-
-_mutex_lock :: proc(m: ^Mutex) {
-	atomic_mutex_lock(&m.impl.mutex)
-}
-
-_mutex_unlock :: proc(m: ^Mutex) {
-	atomic_mutex_unlock(&m.impl.mutex)
-}
-
-_mutex_try_lock :: proc(m: ^Mutex) -> bool {
-	return atomic_mutex_try_lock(&m.impl.mutex)
-}
-
-_Cond :: struct {
-	cond: Atomic_Cond,
-}
-
-_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
-	atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
-}
-
-_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool {
-	return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration)
-}
-
-_cond_signal :: proc(c: ^Cond) {
-	atomic_cond_signal(&c.impl.cond)
-}
-
-_cond_broadcast :: proc(c: ^Cond) {
-	atomic_cond_broadcast(&c.impl.cond)
-}

+ 0 - 37
core/sync/primitives_freebsd.odin

@@ -3,44 +3,7 @@
 package sync
 
 import "core:os"
-import "core:time"
 
 _current_thread_id :: proc "contextless" () -> int {
 	return os.current_thread_id()
 }
-
-_Mutex :: struct {
-	mutex: Atomic_Mutex,
-}
-
-_mutex_lock :: proc(m: ^Mutex) {
-	atomic_mutex_lock(&m.impl.mutex)
-}
-
-_mutex_unlock :: proc(m: ^Mutex) {
-	atomic_mutex_unlock(&m.impl.mutex)
-}
-
-_mutex_try_lock :: proc(m: ^Mutex) -> bool {
-	return atomic_mutex_try_lock(&m.impl.mutex)
-}
-
-_Cond :: struct {
-	cond: Atomic_Cond,
-}
-
-_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
-	atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
-}
-
-_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool {
-	return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration)
-}
-
-_cond_signal :: proc(c: ^Cond) {
-	atomic_cond_signal(&c.impl.cond)
-}
-
-_cond_broadcast :: proc(c: ^Cond) {
-	atomic_cond_broadcast(&c.impl.cond)
-}

+ 80 - 72
core/sync/primitives_internal.odin

@@ -1,99 +1,108 @@
 //+private
 package sync
 
-when #config(ODIN_SYNC_RECURSIVE_MUTEX_USE_FUTEX, true) {
-	_Recursive_Mutex :: struct {
-		owner:     Futex,
-		recursion: i32,
-	}
-
-	_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
-		tid := Futex(current_thread_id())
-		for {
-			prev_owner := atomic_compare_exchange_strong_explicit(&m.impl.owner, 0, tid, .Acquire, .Acquire)
-			switch prev_owner {
-			case 0, tid:
-				m.impl.recursion += 1
-				// inside the lock
-				return
-			}
-
-			futex_wait(&m.impl.owner, u32(prev_owner))
-		}
-	}
+import "core:time"
 
-	_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
-		m.impl.recursion -= 1
-		if m.impl.recursion != 0 {
-			return
-		}
-		atomic_exchange_explicit(&m.impl.owner, 0, .Release)
-		
-		futex_signal(&m.impl.owner)
-		// outside the lock
+_Sema :: struct {
+	atomic: Atomic_Sema,
+}
 
-	}
+_sema_post :: proc(s: ^Sema, count := 1) {
+	atomic_sema_post(&s.impl.atomic, count)
+}
 
-	_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
-		tid := Futex(current_thread_id())
+_sema_wait :: proc(s: ^Sema) {
+	atomic_sema_wait(&s.impl.atomic)
+}
+
+_sema_wait_with_timeout :: proc(s: ^Sema, duration: time.Duration) -> bool {
+	return atomic_sema_wait_with_timeout(&s.impl.atomic, duration)
+}
+
+
+_Recursive_Mutex :: struct {
+	owner:     Futex,
+	recursion: i32,
+}
+
+_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
+	tid := Futex(current_thread_id())
+	for {
 		prev_owner := atomic_compare_exchange_strong_explicit(&m.impl.owner, 0, tid, .Acquire, .Acquire)
 		switch prev_owner {
 		case 0, tid:
 			m.impl.recursion += 1
 			// inside the lock
-			return true
+			return
 		}
-		return false
-	}
-} else {
-	_Recursive_Mutex :: struct {
-		owner:     int,
-		recursion: int,
-		mutex:     Mutex,
+
+		futex_wait(&m.impl.owner, u32(prev_owner))
 	}
+}
 
-	_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
-		tid := current_thread_id()
-		if tid != m.impl.owner {
-			mutex_lock(&m.impl.mutex)
-		}
-		// inside the lock
-		m.impl.owner = tid
-		m.impl.recursion += 1
+_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
+	m.impl.recursion -= 1
+	if m.impl.recursion != 0 {
+		return
 	}
+	atomic_exchange_explicit(&m.impl.owner, 0, .Release)
 
-	_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
-		tid := current_thread_id()
-		assert(tid == m.impl.owner)
-		m.impl.recursion -= 1
-		recursion := m.impl.recursion
-		if recursion == 0 {
-			m.impl.owner = 0
-		}
-		if recursion == 0 {
-			mutex_unlock(&m.impl.mutex)
-		}
-		// outside the lock
+	futex_signal(&m.impl.owner)
+	// outside the lock
 
-	}
+}
 
-	_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
-		tid := current_thread_id()
-		if m.impl.owner == tid {
-			return mutex_try_lock(&m.impl.mutex)
-		}
-		if !mutex_try_lock(&m.impl.mutex) {
-			return false
-		}
-		// inside the lock
-		m.impl.owner = tid
+_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
+	tid := Futex(current_thread_id())
+	prev_owner := atomic_compare_exchange_strong_explicit(&m.impl.owner, 0, tid, .Acquire, .Acquire)
+	switch prev_owner {
+	case 0, tid:
 		m.impl.recursion += 1
+		// inside the lock
 		return true
 	}
+	return false
 }
 
 
 when ODIN_OS != .Windows {
+	_Mutex :: struct {
+		mutex: Atomic_Mutex,
+	}
+
+	_mutex_lock :: proc(m: ^Mutex) {
+		atomic_mutex_lock(&m.impl.mutex)
+	}
+
+	_mutex_unlock :: proc(m: ^Mutex) {
+		atomic_mutex_unlock(&m.impl.mutex)
+	}
+
+	_mutex_try_lock :: proc(m: ^Mutex) -> bool {
+		return atomic_mutex_try_lock(&m.impl.mutex)
+	}
+
+	_Cond :: struct {
+		cond: Atomic_Cond,
+	}
+
+	_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
+		atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
+	}
+
+	_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool {
+		return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration)
+	}
+
+	_cond_signal :: proc(c: ^Cond) {
+		atomic_cond_signal(&c.impl.cond)
+	}
+
+	_cond_broadcast :: proc(c: ^Cond) {
+		atomic_cond_broadcast(&c.impl.cond)
+	}
+
+
 	_RW_Mutex :: struct {
 		mutex: Atomic_RW_Mutex,
 	}
@@ -121,5 +130,4 @@ when ODIN_OS != .Windows {
 	_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
 		return atomic_rw_mutex_try_shared_lock(&rw.impl.mutex)
 	}
-
 }

+ 0 - 38
core/sync/primitives_linux.odin

@@ -3,45 +3,7 @@
 package sync
 
 import "core:sys/unix"
-import "core:time"
 
 _current_thread_id :: proc "contextless" () -> int {
 	return unix.sys_gettid()
 }
-
-
-_Mutex :: struct {
-	mutex: Atomic_Mutex,
-}
-
-_mutex_lock :: proc(m: ^Mutex) {
-	atomic_mutex_lock(&m.impl.mutex)
-}
-
-_mutex_unlock :: proc(m: ^Mutex) {
-	atomic_mutex_unlock(&m.impl.mutex)
-}
-
-_mutex_try_lock :: proc(m: ^Mutex) -> bool {
-	return atomic_mutex_try_lock(&m.impl.mutex)
-}
-
-_Cond :: struct {
-	cond: Atomic_Cond,
-}
-
-_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
-	atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
-}
-
-_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool {
-	return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration)
-}
-
-_cond_signal :: proc(c: ^Cond) {
-	atomic_cond_signal(&c.impl.cond)
-}
-
-_cond_broadcast :: proc(c: ^Cond) {
-	atomic_cond_broadcast(&c.impl.cond)
-}

+ 0 - 37
core/sync/primitives_openbsd.odin

@@ -3,44 +3,7 @@
 package sync
 
 import "core:os"
-import "core:time"
 
 _current_thread_id :: proc "contextless" () -> int {
 	return os.current_thread_id()
 }
-
-_Mutex :: struct {
-	mutex: Atomic_Mutex,
-}
-
-_mutex_lock :: proc(m: ^Mutex) {
-	atomic_mutex_lock(&m.impl.mutex)
-}
-
-_mutex_unlock :: proc(m: ^Mutex) {
-	atomic_mutex_unlock(&m.impl.mutex)
-}
-
-_mutex_try_lock :: proc(m: ^Mutex) -> bool {
-	return atomic_mutex_try_lock(&m.impl.mutex)
-}
-
-_Cond :: struct {
-	cond: Atomic_Cond,
-}
-
-_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
-	atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
-}
-
-_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool {
-	return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration)
-}
-
-_cond_signal :: proc(c: ^Cond) {
-	atomic_cond_signal(&c.impl.cond)
-}
-
-_cond_broadcast :: proc(c: ^Cond) {
-	atomic_cond_broadcast(&c.impl.cond)
-}

+ 0 - 39
core/sync/sema_internal.odin

@@ -1,39 +0,0 @@
-//+private
-package sync
-
-import "core:time"
-
-
-when #config(ODIN_SYNC_SEMA_USE_FUTEX, true) {
-	_Sema :: struct {
-		atomic: Atomic_Sema,
-	}
-
-	_sema_post :: proc(s: ^Sema, count := 1) {
-		atomic_sema_post(&s.impl.atomic, count)
-	}
-
-	_sema_wait :: proc(s: ^Sema) {
-		atomic_sema_wait(&s.impl.atomic)
-	}
-
-	_sema_wait_with_timeout :: proc(s: ^Sema, duration: time.Duration) -> bool {
-		return atomic_sema_wait_with_timeout(&s.impl.atomic, duration)
-	}
-} else {
-	_Sema :: struct {
-		wg: Wait_Group,
-	}
-
-	_sema_post :: proc(s: ^Sema, count := 1) {
-		wait_group_add(&s.impl.wg, count)
-	}
-
-	_sema_wait :: proc(s: ^Sema) {
-		wait_group_wait(&s.impl.wg)
-	}
-
-	_sema_wait_with_timeout :: proc(s: ^Sema, duration: time.Duration) -> bool {
-		return wait_group_wait_with_timeout(&s.impl.wg, duration)
-	}
-}