primitives_atomic.odin 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package sync2
  2. import "core:time"
  3. Atomic_Mutex_State :: enum i32 {
  4. Unlocked = 0,
  5. Locked = 1,
  6. Waiting = 2,
  7. }
  8. // An Atomic_Mutex is a mutual exclusion lock
  9. // The zero value for a Atomic_Mutex is an unlocked mutex
  10. //
  11. // An Atomic_Mutex must not be copied after first use
  12. Atomic_Mutex :: struct {
  13. state: Atomic_Mutex_State,
  14. }
  15. // atomic_mutex_lock locks m
  16. atomic_mutex_lock :: proc(m: ^Atomic_Mutex) {
  17. @(cold)
  18. lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) {
  19. new_state := curr_state; // Make a copy of it
  20. spin_lock: for spin in 0..<i32(100) {
  21. state, ok := atomic_compare_exchange_weak_acquire(&m.state, .Unlocked, new_state);
  22. if ok {
  23. return;
  24. }
  25. if state == .Waiting {
  26. break spin_lock;
  27. }
  28. for i := min(spin+1, 32); i > 0; i -= 1 {
  29. cpu_relax();
  30. }
  31. }
  32. for {
  33. if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked {
  34. return;
  35. }
  36. // TODO(bill): Use a Futex here for Linux to improve performance and error handling
  37. cpu_relax();
  38. }
  39. }
  40. switch v := atomic_exchange_acquire(&m.state, .Locked); v {
  41. case .Unlocked:
  42. // Okay
  43. case: fallthrough;
  44. case .Locked, .Waiting:
  45. lock_slow(m, v);
  46. }
  47. }
  48. // atomic_mutex_unlock unlocks m
  49. atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) {
  50. @(cold)
  51. unlock_slow :: proc(m: ^Atomic_Mutex) {
  52. // TODO(bill): Use a Futex here for Linux to improve performance and error handling
  53. }
  54. switch atomic_exchange_release(&m.state, .Unlocked) {
  55. case .Unlocked:
  56. unreachable();
  57. case .Locked:
  58. // Okay
  59. case .Waiting:
  60. unlock_slow(m);
  61. }
  62. }
  63. // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure
  64. atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool {
  65. _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked);
  66. return ok;
  67. }
  68. // Example:
  69. //
  70. // if atomic_mutex_guard(&m) {
  71. // ...
  72. // }
  73. //
  74. @(deferred_in=atomic_mutex_unlock)
  75. atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool {
  76. atomic_mutex_lock(m);
  77. return true;
  78. }
  79. Atomic_RW_Mutex_State :: distinct uint;
  80. Atomic_RW_Mutex_State_Half_Width :: size_of(Atomic_RW_Mutex_State)*8/2;
  81. Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1);
  82. Atomic_RW_Mutex_State_Writer :: Atomic_RW_Mutex_State(1)<<1;
  83. Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)<<Atomic_RW_Mutex_State_Half_Width;
  84. Atomic_RW_Mutex_State_Writer_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << 1;
  85. Atomic_RW_Mutex_State_Reader_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << Atomic_RW_Mutex_State_Half_Width;
  86. // An Atomic_RW_Mutex is a reader/writer mutual exclusion lock
  87. // The lock can be held by any arbitrary number of readers or a single writer
  88. // The zero value for an Atomic_RW_Mutex is an unlocked mutex
  89. //
  90. // An Atomic_RW_Mutex must not be copied after first use
  91. Atomic_RW_Mutex :: struct {
  92. state: Atomic_RW_Mutex_State,
  93. mutex: Atomic_Mutex,
  94. sema: Atomic_Sema,
  95. }
  96. // atomic_rw_mutex_lock locks rw for writing (with a single writer)
  97. // If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available.
  98. atomic_rw_mutex_lock :: proc(rw: ^Atomic_RW_Mutex) {
  99. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Writer);
  100. atomic_mutex_lock(&rw.mutex);
  101. state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Writer);
  102. if state & Atomic_RW_Mutex_State_Reader_Mask != 0 {
  103. atomic_sema_wait(&rw.sema);
  104. }
  105. }
  106. // atomic_rw_mutex_unlock unlocks rw for writing (with a single writer)
  107. atomic_rw_mutex_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  108. _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing);
  109. atomic_mutex_unlock(&rw.mutex);
  110. }
  111. // atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer)
  112. atomic_rw_mutex_try_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  113. if atomic_mutex_try_lock(&rw.mutex) {
  114. state := atomic_load(&rw.state);
  115. if state & Atomic_RW_Mutex_State_Reader_Mask == 0 {
  116. _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing);
  117. return true;
  118. }
  119. atomic_mutex_unlock(&rw.mutex);
  120. }
  121. return false;
  122. }
  123. // atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers)
  124. atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) {
  125. state := atomic_load(&rw.state);
  126. for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  127. ok: bool;
  128. state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader);
  129. if ok {
  130. return;
  131. }
  132. }
  133. atomic_mutex_lock(&rw.mutex);
  134. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader);
  135. atomic_mutex_unlock(&rw.mutex);
  136. }
  137. // atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers)
  138. atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) {
  139. state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader);
  140. if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) &&
  141. (state & Atomic_RW_Mutex_State_Is_Writing != 0) {
  142. atomic_sema_post(&rw.sema);
  143. }
  144. }
  145. // atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers)
  146. atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool {
  147. state := atomic_load(&rw.state);
  148. if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 {
  149. _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader);
  150. if ok {
  151. return true;
  152. }
  153. }
  154. if atomic_mutex_try_lock(&rw.mutex) {
  155. _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader);
  156. atomic_mutex_unlock(&rw.mutex);
  157. return true;
  158. }
  159. return false;
  160. }
  161. // Example:
  162. //
  163. // if atomic_rw_mutex_guard(&m) {
  164. // ...
  165. // }
  166. //
  167. @(deferred_in=atomic_rw_mutex_unlock)
  168. atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  169. atomic_rw_mutex_lock(m);
  170. return true;
  171. }
  172. // Example:
  173. //
  174. // if atomic_rw_mutex_shared_guard(&m) {
  175. // ...
  176. // }
  177. //
  178. @(deferred_in=atomic_rw_mutex_shared_unlock)
  179. atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool {
  180. atomic_rw_mutex_shared_lock(m);
  181. return true;
  182. }
  183. // An Atomic_Recursive_Mutex is a recursive mutual exclusion lock
  184. // The zero value for a Recursive_Mutex is an unlocked mutex
  185. //
  186. // An Atomic_Recursive_Mutex must not be copied after first use
  187. Atomic_Recursive_Mutex :: struct {
  188. owner: int,
  189. recursion: int,
  190. mutex: Mutex,
  191. }
  192. atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) {
  193. tid := current_thread_id();
  194. if tid != m.owner {
  195. mutex_lock(&m.mutex);
  196. }
  197. // inside the lock
  198. m.owner = tid;
  199. m.recursion += 1;
  200. }
  201. atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) {
  202. tid := current_thread_id();
  203. assert(tid == m.owner);
  204. m.recursion -= 1;
  205. recursion := m.recursion;
  206. if recursion == 0 {
  207. m.owner = 0;
  208. }
  209. if recursion == 0 {
  210. mutex_unlock(&m.mutex);
  211. }
  212. // outside the lock
  213. }
  214. atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  215. tid := current_thread_id();
  216. if m.owner == tid {
  217. return mutex_try_lock(&m.mutex);
  218. }
  219. if !mutex_try_lock(&m.mutex) {
  220. return false;
  221. }
  222. // inside the lock
  223. m.owner = tid;
  224. m.recursion += 1;
  225. return true;
  226. }
  227. // Example:
  228. //
  229. // if atomic_recursive_mutex_guard(&m) {
  230. // ...
  231. // }
  232. //
  233. @(deferred_in=atomic_recursive_mutex_unlock)
  234. atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
  235. atomic_recursive_mutex_lock(m);
  236. return true;
  237. }
  238. @(private="file")
  239. Queue_Item :: struct {
  240. next: ^Queue_Item,
  241. futex: i32,
  242. }
  243. @(private="file")
  244. queue_item_wait :: proc(item: ^Queue_Item) {
  245. for atomic_load_acquire(&item.futex) == 0 {
  246. // TODO(bill): Use a Futex here for Linux to improve performance and error handling
  247. cpu_relax();
  248. }
  249. }
  250. @(private="file")
  251. queue_item_signal :: proc(item: ^Queue_Item) {
  252. atomic_store_release(&item.futex, 1);
  253. // TODO(bill): Use a Futex here for Linux to improve performance and error handling
  254. }
  255. // Atomic_Cond implements a condition variable, a rendezvous point for threads
  256. // waiting for signalling the occurence of an event
  257. //
  258. // An Atomic_Cond must not be copied after first use
  259. Atomic_Cond :: struct {
  260. queue_mutex: Atomic_Mutex,
  261. queue_head: ^Queue_Item,
  262. pending: bool,
  263. }
  264. atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
  265. waiter := &Queue_Item{};
  266. atomic_mutex_lock(&c.queue_mutex);
  267. waiter.next = c.queue_head;
  268. c.queue_head = waiter;
  269. atomic_store(&c.pending, true);
  270. atomic_mutex_unlock(&c.queue_mutex);
  271. atomic_mutex_unlock(m);
  272. queue_item_wait(waiter);
  273. atomic_mutex_lock(m);
  274. }
  275. atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool {
  276. // TODO(bill): _cond_wait_with_timeout for unix
  277. return false;
  278. }
  279. atomic_cond_signal :: proc(c: ^Atomic_Cond) {
  280. if !atomic_load(&c.pending) {
  281. return;
  282. }
  283. atomic_mutex_lock(&c.queue_mutex);
  284. waiter := c.queue_head;
  285. if c.queue_head != nil {
  286. c.queue_head = c.queue_head.next;
  287. }
  288. atomic_store(&c.pending, c.queue_head != nil);
  289. atomic_mutex_unlock(&c.queue_mutex);
  290. if waiter != nil {
  291. queue_item_signal(waiter);
  292. }
  293. }
  294. atomic_cond_broadcast :: proc(c: ^Atomic_Cond) {
  295. if !atomic_load(&c.pending) {
  296. return;
  297. }
  298. atomic_store(&c.pending, false);
  299. atomic_mutex_lock(&c.queue_mutex);
  300. waiters := c.queue_head;
  301. c.queue_head = nil;
  302. atomic_mutex_unlock(&c.queue_mutex);
  303. for waiters != nil {
  304. queue_item_signal(waiters);
  305. waiters = waiters.next;
  306. }
  307. }
  308. // When waited upon, blocks until the internal count is greater than zero, then subtracts one.
  309. // Posting to the semaphore increases the count by one, or the provided amount.
  310. //
  311. // An Atomic_Sema must not be copied after first use
  312. Atomic_Sema :: struct {
  313. mutex: Atomic_Mutex,
  314. cond: Atomic_Cond,
  315. count: int,
  316. }
  317. atomic_sema_wait :: proc(s: ^Atomic_Sema) {
  318. atomic_mutex_lock(&s.mutex);
  319. defer atomic_mutex_unlock(&s.mutex);
  320. for s.count == 0 {
  321. atomic_cond_wait(&s.cond, &s.mutex);
  322. }
  323. s.count -= 1;
  324. if s.count > 0 {
  325. atomic_cond_signal(&s.cond);
  326. }
  327. }
  328. atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) {
  329. atomic_mutex_lock(&s.mutex);
  330. defer atomic_mutex_unlock(&s.mutex);
  331. s.count += count;
  332. atomic_cond_signal(&s.cond);
  333. }