primitives_atomic.odin 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. package sync
  2. import "core:time"
  3. Atomic_Mutex_State :: enum Futex {
  4. Unlocked = 0,
  5. Locked = 1,
  6. Waiting = 2,
  7. }
  8. // An Atomic_Mutex is a mutual exclusion lock
  9. // The zero value for a Atomic_Mutex is an unlocked mutex
  10. //
  11. // An Atomic_Mutex must not be copied after first use
  12. Atomic_Mutex :: struct {
  13. state: Atomic_Mutex_State,
  14. }
  15. // atomic_mutex_lock locks m
  16. atomic_mutex_lock :: proc(m: ^Atomic_Mutex) {
  17. @(cold)
  18. lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) {
  19. new_state := curr_state // Make a copy of it
  20. spin_lock: for spin in 0..<i32(100) {
  21. state, ok := atomic_compare_exchange_weak_explicit(&m.state, .Unlocked, new_state, .Acquire, .Consume)
  22. if ok {
  23. return
  24. }
  25. if state == .Waiting {
  26. break spin_lock
  27. }
  28. for i := min(spin+1, 32); i > 0; i -= 1 {
  29. cpu_relax()
  30. }
  31. }
  32. // Set just in case 100 iterations did not do it
  33. new_state = .Waiting
  34. for {
  35. if atomic_exchange_explicit(&m.state, .Waiting, .Acquire) == .Unlocked {
  36. return
  37. }
  38. futex_wait((^Futex)(&m.state), u32(new_state))
  39. cpu_relax()
  40. }
  41. }
  42. if v := atomic_exchange_explicit(&m.state, .Locked, .Acquire); v != .Unlocked {
  43. lock_slow(m, v)
  44. }
  45. }
  46. // atomic_mutex_unlock unlocks m
  47. atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) {
  48. @(cold)
  49. unlock_slow :: proc(m: ^Atomic_Mutex) {
  50. futex_signal((^Futex)(&m.state))
  51. }
  52. switch atomic_exchange_explicit(&m.state, .Unlocked, .Release) {
  53. case .Unlocked:
  54. unreachable()
  55. case .Locked:
  56. // Okay
  57. case .Waiting:
  58. unlock_slow(m)
  59. }
  60. }
  61. // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure
  62. atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool {
  63. _, ok := atomic_compare_exchange_strong_explicit(&m.state, .Unlocked, .Locked, .Acquire, .Consume)
  64. return ok
  65. }
  66. /*
  67. Example:
  68. if atomic_mutex_guard(&m) {
  69. ...
  70. }
  71. */
  72. @(deferred_in=atomic_mutex_unlock)
  73. atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool {
  74. atomic_mutex_lock(m)
  75. return true
  76. }
  77. Atomic_RW_Mutex_State :: distinct uint
  78. Atomic_RW_Mutex_State_Half_Width :: size_of(Atomic_RW_Mutex_State)*8/2
  79. Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1)
  80. Atomic_RW_Mutex_State_Writer :: Atomic_RW_Mutex_State(1)<<1
  81. Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)<<Atomic_RW_Mutex_State_Half_Width
  82. Atomic_RW_Mutex_State_Writer_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << 1
  83. Atomic_RW_Mutex_State_Reader_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << Atomic_RW_Mutex_State_Half_Width
  84. // An Atomic_RW_Mutex is a reader/writer mutual exclusion lock
  85. // The lock can be held by any arbitrary number of readers or a single writer
  86. // The zero value for an Atomic_RW_Mutex is an unlocked mutex
  87. //
  88. // An Atomic_RW_Mutex must not be copied after first use
  89. Atomic_RW_Mutex :: struct {
  90. state: Atomic_RW_Mutex_State,
  91. mutex: Atomic_Mutex,
  92. sema: Atomic_Sema,
  93. }
  94. // atomic_rw_mutex_lock locks rw for writing (with a single writer)
  95. // If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available.
  96. atomic_rw_mutex_lock :: proc(rw: ^Atomic_RW_Mutex) {
  97. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Writer)
  98. atomic_mutex_lock(&rw.mutex)
  99. state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Writer)
  100. if state & Atomic_RW_Mutex_State_Reader_Mask != 0 {
  101. atomic_sema_wait(&rw.sema)
  102. }
  103. }
  104. // atomic_rw_mutex_unlock unlocks rw for writing (with a single writer)
  105. atomic_rw_mutex_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  106. _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing)
  107. atomic_mutex_unlock(&rw.mutex)
  108. }
  109. // atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer)
  110. atomic_rw_mutex_try_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  111. if atomic_mutex_try_lock(&rw.mutex) {
  112. state := atomic_load(&rw.state)
  113. if state & Atomic_RW_Mutex_State_Reader_Mask == 0 {
  114. _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing)
  115. return true
  116. }
  117. atomic_mutex_unlock(&rw.mutex)
  118. }
  119. return false
  120. }
  121. // atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers)
  122. atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) {
  123. state := atomic_load(&rw.state)
  124. for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  125. ok: bool
  126. state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  127. if ok {
  128. return
  129. }
  130. }
  131. atomic_mutex_lock(&rw.mutex)
  132. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  133. atomic_mutex_unlock(&rw.mutex)
  134. }
  135. // atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers)
  136. atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  137. state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader)
  138. if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) &&
  139. (state & Atomic_RW_Mutex_State_Is_Writing != 0) {
  140. atomic_sema_post(&rw.sema)
  141. }
  142. }
  143. // atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers)
  144. atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  145. state := atomic_load(&rw.state)
  146. if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  147. _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  148. if ok {
  149. return true
  150. }
  151. }
  152. if atomic_mutex_try_lock(&rw.mutex) {
  153. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  154. atomic_mutex_unlock(&rw.mutex)
  155. return true
  156. }
  157. return false
  158. }
  159. /*
  160. Example:
  161. if atomic_rw_mutex_guard(&m) {
  162. ...
  163. }
  164. */
  165. @(deferred_in=atomic_rw_mutex_unlock)
  166. atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  167. atomic_rw_mutex_lock(m)
  168. return true
  169. }
  170. /*
  171. Example:
  172. if atomic_rw_mutex_shared_guard(&m) {
  173. ...
  174. }
  175. */
  176. @(deferred_in=atomic_rw_mutex_shared_unlock)
  177. atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  178. atomic_rw_mutex_shared_lock(m)
  179. return true
  180. }
  181. // An Atomic_Recursive_Mutex is a recursive mutual exclusion lock
  182. // The zero value for a Recursive_Mutex is an unlocked mutex
  183. //
  184. // An Atomic_Recursive_Mutex must not be copied after first use
  185. Atomic_Recursive_Mutex :: struct {
  186. owner: int,
  187. recursion: int,
  188. mutex: Mutex,
  189. }
  190. atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) {
  191. tid := current_thread_id()
  192. if tid != m.owner {
  193. mutex_lock(&m.mutex)
  194. }
  195. // inside the lock
  196. m.owner = tid
  197. m.recursion += 1
  198. }
  199. atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) {
  200. tid := current_thread_id()
  201. assert(tid == m.owner)
  202. m.recursion -= 1
  203. recursion := m.recursion
  204. if recursion == 0 {
  205. m.owner = 0
  206. }
  207. if recursion == 0 {
  208. mutex_unlock(&m.mutex)
  209. }
  210. // outside the lock
  211. }
  212. atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  213. tid := current_thread_id()
  214. if m.owner == tid {
  215. return mutex_try_lock(&m.mutex)
  216. }
  217. if !mutex_try_lock(&m.mutex) {
  218. return false
  219. }
  220. // inside the lock
  221. m.owner = tid
  222. m.recursion += 1
  223. return true
  224. }
  225. /*
  226. Example:
  227. if atomic_recursive_mutex_guard(&m) {
  228. ...
  229. }
  230. */
  231. @(deferred_in=atomic_recursive_mutex_unlock)
  232. atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  233. atomic_recursive_mutex_lock(m)
  234. return true
  235. }
  236. @(private="file")
  237. Queue_Item :: struct {
  238. next: ^Queue_Item,
  239. futex: Futex,
  240. }
  241. @(private="file")
  242. queue_item_wait :: proc(item: ^Queue_Item) {
  243. for atomic_load_explicit(&item.futex, .Acquire) == 0 {
  244. futex_wait(&item.futex, 0)
  245. cpu_relax()
  246. }
  247. }
  248. @(private="file")
  249. queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) -> bool {
  250. start := time.tick_now()
  251. for atomic_load_explicit(&item.futex, .Acquire) == 0 {
  252. remaining := duration - time.tick_since(start)
  253. if remaining < 0 {
  254. return false
  255. }
  256. if !futex_wait_with_timeout(&item.futex, 0, remaining) {
  257. return false
  258. }
  259. cpu_relax()
  260. }
  261. return true
  262. }
  263. @(private="file")
  264. queue_item_signal :: proc(item: ^Queue_Item) {
  265. atomic_store_explicit(&item.futex, 1, .Release)
  266. futex_signal(&item.futex)
  267. }
  268. // Atomic_Cond implements a condition variable, a rendezvous point for threads
  269. // waiting for signalling the occurence of an event
  270. //
  271. // An Atomic_Cond must not be copied after first use
  272. Atomic_Cond :: struct {
  273. queue_mutex: Atomic_Mutex,
  274. queue_head: ^Queue_Item,
  275. pending: bool,
  276. }
  277. atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
  278. waiter := &Queue_Item{}
  279. atomic_mutex_lock(&c.queue_mutex)
  280. waiter.next = c.queue_head
  281. c.queue_head = waiter
  282. atomic_store(&c.pending, true)
  283. atomic_mutex_unlock(&c.queue_mutex)
  284. atomic_mutex_unlock(m)
  285. queue_item_wait(waiter)
  286. atomic_mutex_lock(m)
  287. }
  288. atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, duration: time.Duration) -> (ok: bool) {
  289. waiter := &Queue_Item{}
  290. atomic_mutex_lock(&c.queue_mutex)
  291. waiter.next = c.queue_head
  292. c.queue_head = waiter
  293. atomic_store(&c.pending, true)
  294. atomic_mutex_unlock(&c.queue_mutex)
  295. atomic_mutex_unlock(m)
  296. ok = queue_item_wait_with_timeout(waiter, duration)
  297. atomic_mutex_lock(m)
  298. return
  299. }
  300. atomic_cond_signal :: proc(c: ^Atomic_Cond) {
  301. if !atomic_load(&c.pending) {
  302. return
  303. }
  304. atomic_mutex_lock(&c.queue_mutex)
  305. waiter := c.queue_head
  306. if c.queue_head != nil {
  307. c.queue_head = c.queue_head.next
  308. }
  309. atomic_store(&c.pending, c.queue_head != nil)
  310. atomic_mutex_unlock(&c.queue_mutex)
  311. if waiter != nil {
  312. queue_item_signal(waiter)
  313. }
  314. }
  315. atomic_cond_broadcast :: proc(c: ^Atomic_Cond) {
  316. if !atomic_load(&c.pending) {
  317. return
  318. }
  319. atomic_store(&c.pending, false)
  320. atomic_mutex_lock(&c.queue_mutex)
  321. waiters := c.queue_head
  322. c.queue_head = nil
  323. atomic_mutex_unlock(&c.queue_mutex)
  324. for waiters != nil {
  325. queue_item_signal(waiters)
  326. waiters = waiters.next
  327. }
  328. }
  329. // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
  330. // Posting to the semaphore increases the count by one, or the provided amount.
  331. //
  332. // An Atomic_Sema must not be copied after first use
  333. Atomic_Sema :: struct {
  334. mutex: Atomic_Mutex,
  335. cond: Atomic_Cond,
  336. count: int,
  337. }
  338. atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) {
  339. atomic_mutex_lock(&s.mutex)
  340. defer atomic_mutex_unlock(&s.mutex)
  341. s.count += count
  342. atomic_cond_signal(&s.cond)
  343. }
  344. atomic_sema_wait :: proc(s: ^Atomic_Sema) {
  345. atomic_mutex_lock(&s.mutex)
  346. defer atomic_mutex_unlock(&s.mutex)
  347. for s.count == 0 {
  348. atomic_cond_wait(&s.cond, &s.mutex)
  349. }
  350. s.count -= 1
  351. if s.count > 0 {
  352. atomic_cond_signal(&s.cond)
  353. }
  354. }
  355. atomic_sema_wait_with_timeout :: proc(s: ^Atomic_Sema, duration: time.Duration) -> bool {
  356. if duration <= 0 {
  357. return false
  358. }
  359. atomic_mutex_lock(&s.mutex)
  360. defer atomic_mutex_unlock(&s.mutex)
  361. start := time.tick_now()
  362. for s.count == 0 {
  363. remaining := duration - time.tick_since(start)
  364. if remaining < 0 {
  365. return false
  366. }
  367. if !atomic_cond_wait_with_timeout(&s.cond, &s.mutex, remaining) {
  368. return false
  369. }
  370. }
  371. s.count -= 1
  372. if s.count > 0 {
  373. atomic_cond_signal(&s.cond)
  374. }
  375. return true
  376. }