primitives_atomic.odin 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package sync
  2. import "core:time"
  3. Atomic_Mutex_State :: enum Futex {
  4. Unlocked = 0,
  5. Locked = 1,
  6. Waiting = 2,
  7. }
  8. // An Atomic_Mutex is a mutual exclusion lock
  9. // The zero value for a Atomic_Mutex is an unlocked mutex
  10. //
  11. // An Atomic_Mutex must not be copied after first use
  12. Atomic_Mutex :: struct #no_copy {
  13. state: Atomic_Mutex_State,
  14. }
  15. // atomic_mutex_lock locks m
  16. atomic_mutex_lock :: proc "contextless" (m: ^Atomic_Mutex) {
  17. @(cold)
  18. lock_slow :: proc "contextless" (m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) {
  19. new_state := curr_state // Make a copy of it
  20. spin_lock: for spin in 0..<i32(100) {
  21. state, ok := atomic_compare_exchange_weak_explicit(&m.state, .Unlocked, new_state, .Acquire, .Consume)
  22. if ok {
  23. return
  24. }
  25. if state == .Waiting {
  26. break spin_lock
  27. }
  28. for i := min(spin+1, 32); i > 0; i -= 1 {
  29. cpu_relax()
  30. }
  31. }
  32. // Set just in case 100 iterations did not do it
  33. new_state = .Waiting
  34. for {
  35. if atomic_exchange_explicit(&m.state, .Waiting, .Acquire) == .Unlocked {
  36. return
  37. }
  38. futex_wait((^Futex)(&m.state), u32(new_state))
  39. cpu_relax()
  40. }
  41. }
  42. if v := atomic_exchange_explicit(&m.state, .Locked, .Acquire); v != .Unlocked {
  43. lock_slow(m, v)
  44. }
  45. }
  46. // atomic_mutex_unlock unlocks m
  47. atomic_mutex_unlock :: proc "contextless" (m: ^Atomic_Mutex) {
  48. @(cold)
  49. unlock_slow :: proc "contextless" (m: ^Atomic_Mutex) {
  50. futex_signal((^Futex)(&m.state))
  51. }
  52. switch atomic_exchange_explicit(&m.state, .Unlocked, .Release) {
  53. case .Unlocked:
  54. // Kind of okay - unlocking while already unlocked.
  55. case .Locked:
  56. // Okay
  57. case .Waiting:
  58. unlock_slow(m)
  59. }
  60. }
  61. // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure
  62. atomic_mutex_try_lock :: proc "contextless" (m: ^Atomic_Mutex) -> bool {
  63. _, ok := atomic_compare_exchange_strong_explicit(&m.state, .Unlocked, .Locked, .Acquire, .Consume)
  64. return ok
  65. }
  66. /*
  67. Example:
  68. if atomic_mutex_guard(&m) {
  69. ...
  70. }
  71. */
  72. @(deferred_in=atomic_mutex_unlock)
  73. atomic_mutex_guard :: proc "contextless" (m: ^Atomic_Mutex) -> bool {
  74. atomic_mutex_lock(m)
  75. return true
  76. }
  77. Atomic_RW_Mutex_State :: distinct uint
  78. Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1) << (size_of(Atomic_RW_Mutex_State)*8-1)
  79. Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)
  80. Atomic_RW_Mutex_State_Reader_Mask :: ~Atomic_RW_Mutex_State_Is_Writing
  81. // An Atomic_RW_Mutex is a reader/writer mutual exclusion lock.
  82. // The lock can be held by any arbitrary number of readers or a single writer.
  83. // The zero value for an Atomic_RW_Mutex is an unlocked mutex.
  84. //
  85. // An Atomic_RW_Mutex must not be copied after first use.
  86. Atomic_RW_Mutex :: struct #no_copy {
  87. state: Atomic_RW_Mutex_State,
  88. mutex: Atomic_Mutex,
  89. sema: Atomic_Sema,
  90. }
  91. // atomic_rw_mutex_lock locks rw for writing (with a single writer)
  92. // If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available.
  93. atomic_rw_mutex_lock :: proc "contextless" (rw: ^Atomic_RW_Mutex) {
  94. atomic_mutex_lock(&rw.mutex)
  95. state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing)
  96. if state & Atomic_RW_Mutex_State_Reader_Mask != 0 {
  97. // There's at least one reader, so wait for the last one to post the semaphore.
  98. //
  99. // Because we hold the exclusive lock, no more readers can come in
  100. // during this time, which will prevent any situations where the last
  101. // reader is pre-empted around the count turning zero, which would
  102. // result in the potential for another reader to run amok after the
  103. // other posts.
  104. atomic_sema_wait(&rw.sema)
  105. }
  106. }
  107. // atomic_rw_mutex_unlock unlocks rw for writing (with a single writer)
  108. atomic_rw_mutex_unlock :: proc "contextless" (rw: ^Atomic_RW_Mutex) {
  109. _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing)
  110. atomic_mutex_unlock(&rw.mutex)
  111. }
  112. // atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer)
  113. atomic_rw_mutex_try_lock :: proc "contextless" (rw: ^Atomic_RW_Mutex) -> bool {
  114. if atomic_mutex_try_lock(&rw.mutex) {
  115. state := atomic_load(&rw.state)
  116. if state & Atomic_RW_Mutex_State_Reader_Mask == 0 {
  117. // Compare-and-exchange for absolute certainty that no one has come in to read.
  118. _, ok := atomic_compare_exchange_strong(&rw.state, state, state | Atomic_RW_Mutex_State_Is_Writing)
  119. if ok {
  120. return true
  121. }
  122. }
  123. // A reader is active or came in while we have the lock, so we need to
  124. // back out.
  125. atomic_mutex_unlock(&rw.mutex)
  126. }
  127. return false
  128. }
  129. // atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers)
  130. atomic_rw_mutex_shared_lock :: proc "contextless" (rw: ^Atomic_RW_Mutex) {
  131. state := atomic_load(&rw.state)
  132. for state & Atomic_RW_Mutex_State_Is_Writing == 0 {
  133. ok: bool
  134. state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  135. if ok {
  136. // We succesfully took the shared reader lock without any writers intervening.
  137. return
  138. }
  139. }
  140. // A writer is active or came in while we were trying to get a shared
  141. // reader lock, so now we must take the full lock in order to wait for the
  142. // writer to give it up.
  143. atomic_mutex_lock(&rw.mutex)
  144. // At this point, we have the lock, so we can add to the reader count.
  145. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  146. // Then we give up the lock to let other readers (or writers) come through.
  147. atomic_mutex_unlock(&rw.mutex)
  148. }
  149. // atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers)
  150. atomic_rw_mutex_shared_unlock :: proc "contextless" (rw: ^Atomic_RW_Mutex) {
  151. state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader)
  152. if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) &&
  153. (state & Atomic_RW_Mutex_State_Is_Writing != 0) {
  154. // We were the last reader, so post to the writer with the lock who's
  155. // waiting to continue.
  156. atomic_sema_post(&rw.sema)
  157. }
  158. }
  159. // atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers)
  160. atomic_rw_mutex_try_shared_lock :: proc "contextless" (rw: ^Atomic_RW_Mutex) -> bool {
  161. state := atomic_load(&rw.state)
  162. // NOTE: We need to check this in a for loop, because it is possible for
  163. // another reader to change the underlying state which would cause our
  164. // compare-and-exchange to fail.
  165. for state & (Atomic_RW_Mutex_State_Is_Writing) == 0 {
  166. ok: bool
  167. state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  168. if ok {
  169. return true
  170. }
  171. }
  172. // A writer is active or came in during our lock attempt.
  173. // We try to take the full lock, and if that succeeds (perhaps because the
  174. // writer finished during the time since we failed our CAS), we increment
  175. // the reader count and head on.
  176. if atomic_mutex_try_lock(&rw.mutex) {
  177. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  178. atomic_mutex_unlock(&rw.mutex)
  179. return true
  180. }
  181. return false
  182. }
  183. /*
  184. Example:
  185. if atomic_rw_mutex_guard(&m) {
  186. ...
  187. }
  188. */
  189. @(deferred_in=atomic_rw_mutex_unlock)
  190. atomic_rw_mutex_guard :: proc "contextless" (m: ^Atomic_RW_Mutex) -> bool {
  191. atomic_rw_mutex_lock(m)
  192. return true
  193. }
  194. /*
  195. Example:
  196. if atomic_rw_mutex_shared_guard(&m) {
  197. ...
  198. }
  199. */
  200. @(deferred_in=atomic_rw_mutex_shared_unlock)
  201. atomic_rw_mutex_shared_guard :: proc "contextless" (m: ^Atomic_RW_Mutex) -> bool {
  202. atomic_rw_mutex_shared_lock(m)
  203. return true
  204. }
  205. // An Atomic_Recursive_Mutex is a recursive mutual exclusion lock
  206. // The zero value for a Recursive_Mutex is an unlocked mutex
  207. //
  208. // An Atomic_Recursive_Mutex must not be copied after first use
  209. Atomic_Recursive_Mutex :: struct #no_copy {
  210. owner: int,
  211. recursion: int,
  212. mutex: Mutex,
  213. }
  214. atomic_recursive_mutex_lock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) {
  215. tid := current_thread_id()
  216. if tid != m.owner {
  217. mutex_lock(&m.mutex)
  218. }
  219. // inside the lock
  220. m.owner = tid
  221. m.recursion += 1
  222. }
  223. atomic_recursive_mutex_unlock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) {
  224. tid := current_thread_id()
  225. assert_contextless(tid == m.owner, "tid != m.owner")
  226. m.recursion -= 1
  227. recursion := m.recursion
  228. if recursion == 0 {
  229. m.owner = 0
  230. }
  231. if recursion == 0 {
  232. mutex_unlock(&m.mutex)
  233. }
  234. // outside the lock
  235. }
  236. atomic_recursive_mutex_try_lock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) -> bool {
  237. tid := current_thread_id()
  238. if m.owner == tid {
  239. return mutex_try_lock(&m.mutex)
  240. }
  241. if !mutex_try_lock(&m.mutex) {
  242. return false
  243. }
  244. // inside the lock
  245. m.owner = tid
  246. m.recursion += 1
  247. return true
  248. }
  249. /*
  250. Example:
  251. if atomic_recursive_mutex_guard(&m) {
  252. ...
  253. }
  254. */
  255. @(deferred_in=atomic_recursive_mutex_unlock)
  256. atomic_recursive_mutex_guard :: proc "contextless" (m: ^Atomic_Recursive_Mutex) -> bool {
  257. atomic_recursive_mutex_lock(m)
  258. return true
  259. }
  260. // Atomic_Cond implements a condition variable, a rendezvous point for threads
  261. // waiting for signalling the occurence of an event
  262. //
  263. // An Atomic_Cond must not be copied after first use
  264. Atomic_Cond :: struct #no_copy {
  265. state: Futex,
  266. }
  267. atomic_cond_wait :: proc "contextless" (c: ^Atomic_Cond, m: ^Atomic_Mutex) {
  268. state := u32(atomic_load_explicit(&c.state, .Relaxed))
  269. unlock(m)
  270. futex_wait(&c.state, state)
  271. lock(m)
  272. }
  273. atomic_cond_wait_with_timeout :: proc "contextless" (c: ^Atomic_Cond, m: ^Atomic_Mutex, duration: time.Duration) -> (ok: bool) {
  274. state := u32(atomic_load_explicit(&c.state, .Relaxed))
  275. unlock(m)
  276. ok = futex_wait_with_timeout(&c.state, state, duration)
  277. lock(m)
  278. return
  279. }
  280. atomic_cond_signal :: proc "contextless" (c: ^Atomic_Cond) {
  281. atomic_add_explicit(&c.state, 1, .Release)
  282. futex_signal(&c.state)
  283. }
  284. atomic_cond_broadcast :: proc "contextless" (c: ^Atomic_Cond) {
  285. atomic_add_explicit(&c.state, 1, .Release)
  286. futex_broadcast(&c.state)
  287. }
  288. // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
  289. // Posting to the semaphore increases the count by one, or the provided amount.
  290. //
  291. // An Atomic_Sema must not be copied after first use
  292. Atomic_Sema :: struct #no_copy {
  293. count: Futex,
  294. }
  295. atomic_sema_post :: proc "contextless" (s: ^Atomic_Sema, count := 1) {
  296. atomic_add_explicit(&s.count, Futex(count), .Release)
  297. if count == 1 {
  298. futex_signal(&s.count)
  299. } else {
  300. futex_broadcast(&s.count)
  301. }
  302. }
  303. atomic_sema_wait :: proc "contextless" (s: ^Atomic_Sema) {
  304. for {
  305. original_count := atomic_load_explicit(&s.count, .Relaxed)
  306. for original_count == 0 {
  307. futex_wait(&s.count, u32(original_count))
  308. original_count = atomic_load_explicit(&s.count, .Relaxed)
  309. }
  310. if original_count == atomic_compare_exchange_strong_explicit(&s.count, original_count, original_count-1, .Acquire, .Acquire) {
  311. return
  312. }
  313. }
  314. }
  315. atomic_sema_wait_with_timeout :: proc "contextless" (s: ^Atomic_Sema, duration: time.Duration) -> bool {
  316. if duration <= 0 {
  317. return false
  318. }
  319. for {
  320. original_count := atomic_load_explicit(&s.count, .Relaxed)
  321. for start := time.tick_now(); original_count == 0; /**/ {
  322. remaining := duration - time.tick_since(start)
  323. if remaining < 0 {
  324. return false
  325. }
  326. if !futex_wait_with_timeout(&s.count, u32(original_count), remaining) {
  327. return false
  328. }
  329. original_count = atomic_load_explicit(&s.count, .Relaxed)
  330. }
  331. if original_count == atomic_compare_exchange_strong_explicit(&s.count, original_count, original_count-1, .Acquire, .Acquire) {
  332. return true
  333. }
  334. }
  335. }