primitives_atomic.odin 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package sync2
  2. import "core:time"
  3. Atomic_Mutex_State :: enum Futex {
  4. Unlocked = 0,
  5. Locked = 1,
  6. Waiting = 2,
  7. }
  8. // An Atomic_Mutex is a mutual exclusion lock
  9. // The zero value for a Atomic_Mutex is an unlocked mutex
  10. //
  11. // An Atomic_Mutex must not be copied after first use
  12. Atomic_Mutex :: struct {
  13. state: Atomic_Mutex_State,
  14. }
  15. // atomic_mutex_lock locks m
  16. atomic_mutex_lock :: proc(m: ^Atomic_Mutex) {
  17. @(cold)
  18. lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) {
  19. new_state := curr_state // Make a copy of it
  20. spin_lock: for spin in 0..<i32(100) {
  21. state, ok := atomic_compare_exchange_weak_acquire(&m.state, .Unlocked, new_state)
  22. if ok {
  23. return
  24. }
  25. if state == .Waiting {
  26. break spin_lock
  27. }
  28. for i := min(spin+1, 32); i > 0; i -= 1 {
  29. cpu_relax()
  30. }
  31. }
  32. for {
  33. if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked {
  34. return
  35. }
  36. futex_wait((^Futex)(&m.state), u32(new_state))
  37. cpu_relax()
  38. }
  39. }
  40. switch v := atomic_exchange_acquire(&m.state, .Locked); v {
  41. case .Unlocked:
  42. // Okay
  43. case: fallthrough
  44. case .Locked, .Waiting:
  45. lock_slow(m, v)
  46. }
  47. }
  48. // atomic_mutex_unlock unlocks m
  49. atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) {
  50. @(cold)
  51. unlock_slow :: proc(m: ^Atomic_Mutex) {
  52. futex_signal((^Futex)(&m.state))
  53. }
  54. switch atomic_exchange_release(&m.state, .Unlocked) {
  55. case .Unlocked:
  56. unreachable()
  57. case .Locked:
  58. // Okay
  59. case .Waiting:
  60. unlock_slow(m)
  61. }
  62. }
  63. // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure
  64. atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool {
  65. _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked)
  66. return ok
  67. }
  68. /*
  69. Example:
  70. if atomic_mutex_guard(&m) {
  71. ...
  72. }
  73. */
  74. @(deferred_in=atomic_mutex_unlock)
  75. atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool {
  76. atomic_mutex_lock(m)
  77. return true
  78. }
  79. Atomic_RW_Mutex_State :: distinct uint
  80. Atomic_RW_Mutex_State_Half_Width :: size_of(Atomic_RW_Mutex_State)*8/2
  81. Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1)
  82. Atomic_RW_Mutex_State_Writer :: Atomic_RW_Mutex_State(1)<<1
  83. Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)<<Atomic_RW_Mutex_State_Half_Width
  84. Atomic_RW_Mutex_State_Writer_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << 1
  85. Atomic_RW_Mutex_State_Reader_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << Atomic_RW_Mutex_State_Half_Width
  86. // An Atomic_RW_Mutex is a reader/writer mutual exclusion lock
  87. // The lock can be held by any arbitrary number of readers or a single writer
  88. // The zero value for an Atomic_RW_Mutex is an unlocked mutex
  89. //
  90. // An Atomic_RW_Mutex must not be copied after first use
  91. Atomic_RW_Mutex :: struct {
  92. state: Atomic_RW_Mutex_State,
  93. mutex: Atomic_Mutex,
  94. sema: Atomic_Sema,
  95. }
  96. // atomic_rw_mutex_lock locks rw for writing (with a single writer)
  97. // If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available.
  98. atomic_rw_mutex_lock :: proc(rw: ^Atomic_RW_Mutex) {
  99. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Writer)
  100. atomic_mutex_lock(&rw.mutex)
  101. state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Writer)
  102. if state & Atomic_RW_Mutex_State_Reader_Mask != 0 {
  103. atomic_sema_wait(&rw.sema)
  104. }
  105. }
  106. // atomic_rw_mutex_unlock unlocks rw for writing (with a single writer)
  107. atomic_rw_mutex_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  108. _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing)
  109. atomic_mutex_unlock(&rw.mutex)
  110. }
  111. // atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer)
  112. atomic_rw_mutex_try_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  113. if atomic_mutex_try_lock(&rw.mutex) {
  114. state := atomic_load(&rw.state)
  115. if state & Atomic_RW_Mutex_State_Reader_Mask == 0 {
  116. _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing)
  117. return true
  118. }
  119. atomic_mutex_unlock(&rw.mutex)
  120. }
  121. return false
  122. }
  123. // atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers)
  124. atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) {
  125. state := atomic_load(&rw.state)
  126. for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  127. ok: bool
  128. state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  129. if ok {
  130. return
  131. }
  132. }
  133. atomic_mutex_lock(&rw.mutex)
  134. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  135. atomic_mutex_unlock(&rw.mutex)
  136. }
  137. // atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers)
  138. atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  139. state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader)
  140. if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) &&
  141. (state & Atomic_RW_Mutex_State_Is_Writing != 0) {
  142. atomic_sema_post(&rw.sema)
  143. }
  144. }
  145. // atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers)
  146. atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  147. state := atomic_load(&rw.state)
  148. if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  149. _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader)
  150. if ok {
  151. return true
  152. }
  153. }
  154. if atomic_mutex_try_lock(&rw.mutex) {
  155. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader)
  156. atomic_mutex_unlock(&rw.mutex)
  157. return true
  158. }
  159. return false
  160. }
  161. /*
  162. Example:
  163. if atomic_rw_mutex_guard(&m) {
  164. ...
  165. }
  166. */
  167. @(deferred_in=atomic_rw_mutex_unlock)
  168. atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  169. atomic_rw_mutex_lock(m)
  170. return true
  171. }
  172. /*
  173. Example:
  174. if atomic_rw_mutex_shared_guard(&m) {
  175. ...
  176. }
  177. */
  178. @(deferred_in=atomic_rw_mutex_shared_unlock)
  179. atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  180. atomic_rw_mutex_shared_lock(m)
  181. return true
  182. }
  183. // An Atomic_Recursive_Mutex is a recursive mutual exclusion lock
  184. // The zero value for a Recursive_Mutex is an unlocked mutex
  185. //
  186. // An Atomic_Recursive_Mutex must not be copied after first use
  187. Atomic_Recursive_Mutex :: struct {
  188. owner: int,
  189. recursion: int,
  190. mutex: Mutex,
  191. }
  192. atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) {
  193. tid := current_thread_id()
  194. if tid != m.owner {
  195. mutex_lock(&m.mutex)
  196. }
  197. // inside the lock
  198. m.owner = tid
  199. m.recursion += 1
  200. }
  201. atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) {
  202. tid := current_thread_id()
  203. assert(tid == m.owner)
  204. m.recursion -= 1
  205. recursion := m.recursion
  206. if recursion == 0 {
  207. m.owner = 0
  208. }
  209. if recursion == 0 {
  210. mutex_unlock(&m.mutex)
  211. }
  212. // outside the lock
  213. }
  214. atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  215. tid := current_thread_id()
  216. if m.owner == tid {
  217. return mutex_try_lock(&m.mutex)
  218. }
  219. if !mutex_try_lock(&m.mutex) {
  220. return false
  221. }
  222. // inside the lock
  223. m.owner = tid
  224. m.recursion += 1
  225. return true
  226. }
  227. /*
  228. Example:
  229. if atomic_recursive_mutex_guard(&m) {
  230. ...
  231. }
  232. */
  233. @(deferred_in=atomic_recursive_mutex_unlock)
  234. atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  235. atomic_recursive_mutex_lock(m)
  236. return true
  237. }
  238. @(private="file")
  239. Queue_Item :: struct {
  240. next: ^Queue_Item,
  241. futex: Futex,
  242. }
  243. @(private="file")
  244. queue_item_wait :: proc(item: ^Queue_Item) {
  245. for atomic_load_acquire(&item.futex) == 0 {
  246. futex_wait(&item.futex, 0)
  247. cpu_relax()
  248. }
  249. }
  250. @(private="file")
  251. queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) -> bool {
  252. start := time.tick_now()
  253. for atomic_load_acquire(&item.futex) == 0 {
  254. remaining := duration - time.tick_since(start)
  255. if remaining < 0 {
  256. return false
  257. }
  258. if !futex_wait_with_timeout(&item.futex, 0, remaining) {
  259. return false
  260. }
  261. cpu_relax()
  262. }
  263. return true
  264. }
  265. @(private="file")
  266. queue_item_signal :: proc(item: ^Queue_Item) {
  267. atomic_store_release(&item.futex, 1)
  268. futex_signal(&item.futex)
  269. }
  270. // Atomic_Cond implements a condition variable, a rendezvous point for threads
  271. // waiting for signalling the occurence of an event
  272. //
  273. // An Atomic_Cond must not be copied after first use
  274. Atomic_Cond :: struct {
  275. queue_mutex: Atomic_Mutex,
  276. queue_head: ^Queue_Item,
  277. pending: bool,
  278. }
  279. atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
  280. waiter := &Queue_Item{}
  281. atomic_mutex_lock(&c.queue_mutex)
  282. waiter.next = c.queue_head
  283. c.queue_head = waiter
  284. atomic_store(&c.pending, true)
  285. atomic_mutex_unlock(&c.queue_mutex)
  286. atomic_mutex_unlock(m)
  287. queue_item_wait(waiter)
  288. atomic_mutex_lock(m)
  289. }
  290. atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, duration: time.Duration) -> (ok: bool) {
  291. waiter := &Queue_Item{}
  292. atomic_mutex_lock(&c.queue_mutex)
  293. waiter.next = c.queue_head
  294. c.queue_head = waiter
  295. atomic_store(&c.pending, true)
  296. atomic_mutex_unlock(&c.queue_mutex)
  297. atomic_mutex_unlock(m)
  298. ok = queue_item_wait_with_timeout(waiter, duration)
  299. atomic_mutex_lock(m)
  300. return
  301. }
  302. atomic_cond_signal :: proc(c: ^Atomic_Cond) {
  303. if !atomic_load(&c.pending) {
  304. return
  305. }
  306. atomic_mutex_lock(&c.queue_mutex)
  307. waiter := c.queue_head
  308. if c.queue_head != nil {
  309. c.queue_head = c.queue_head.next
  310. }
  311. atomic_store(&c.pending, c.queue_head != nil)
  312. atomic_mutex_unlock(&c.queue_mutex)
  313. if waiter != nil {
  314. queue_item_signal(waiter)
  315. }
  316. }
  317. atomic_cond_broadcast :: proc(c: ^Atomic_Cond) {
  318. if !atomic_load(&c.pending) {
  319. return
  320. }
  321. atomic_store(&c.pending, false)
  322. atomic_mutex_lock(&c.queue_mutex)
  323. waiters := c.queue_head
  324. c.queue_head = nil
  325. atomic_mutex_unlock(&c.queue_mutex)
  326. for waiters != nil {
  327. queue_item_signal(waiters)
  328. waiters = waiters.next
  329. }
  330. }
  331. // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
  332. // Posting to the semaphore increases the count by one, or the provided amount.
  333. //
  334. // An Atomic_Sema must not be copied after first use
  335. Atomic_Sema :: struct {
  336. mutex: Atomic_Mutex,
  337. cond: Atomic_Cond,
  338. count: int,
  339. }
  340. atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) {
  341. atomic_mutex_lock(&s.mutex)
  342. defer atomic_mutex_unlock(&s.mutex)
  343. s.count += count
  344. atomic_cond_signal(&s.cond)
  345. }
  346. atomic_sema_wait :: proc(s: ^Atomic_Sema) {
  347. atomic_mutex_lock(&s.mutex)
  348. defer atomic_mutex_unlock(&s.mutex)
  349. for s.count == 0 {
  350. atomic_cond_wait(&s.cond, &s.mutex)
  351. }
  352. s.count -= 1
  353. if s.count > 0 {
  354. atomic_cond_signal(&s.cond)
  355. }
  356. }
  357. atomic_sema_wait_with_timeout :: proc(s: ^Atomic_Sema, duration: time.Duration) -> bool {
  358. if duration <= 0 {
  359. return false
  360. }
  361. atomic_mutex_lock(&s.mutex)
  362. defer atomic_mutex_unlock(&s.mutex)
  363. start := time.tick_now()
  364. for s.count == 0 {
  365. remaining := duration - time.tick_since(start)
  366. if remaining < 0 {
  367. return false
  368. }
  369. if !atomic_cond_wait_with_timeout(&s.cond, &s.mutex, remaining) {
  370. return false
  371. }
  372. }
  373. s.count -= 1
  374. if s.count > 0 {
  375. atomic_cond_signal(&s.cond)
  376. }
  377. return true
  378. }