channel.odin 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889
  1. package sync
  2. import "core:mem"
  3. import "core:time"
  4. import "core:intrinsics"
  5. import "core:math/rand"
  6. _, _ :: time, rand;
  7. Channel_Direction :: enum i8 {
  8. Both = 0,
  9. Send = +1,
  10. Recv = -1,
  11. }
  12. Channel :: struct($T: typeid, $Direction := Channel_Direction.Both) {
  13. using _internal: ^Raw_Channel,
  14. }
  15. channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) {
  16. context.allocator = allocator;
  17. ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
  18. return;
  19. }
  20. channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) {
  21. context.allocator = allocator;
  22. ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
  23. return;
  24. }
  25. channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) {
  26. context.allocator = allocator;
  27. ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
  28. return;
  29. }
  30. channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) {
  31. context.allocator = allocator;
  32. ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
  33. return;
  34. }
  35. channel_destroy :: proc(ch: $C/Channel($T, $D)) {
  36. raw_channel_destroy(ch._internal);
  37. }
  38. channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) {
  39. res._internal = ch._internal;
  40. return;
  41. }
  42. channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) {
  43. res._internal = ch._internal;
  44. return;
  45. }
  46. channel_len :: proc(ch: $C/Channel($T, $D)) -> int {
  47. return ch._internal.len if ch._internal != nil else 0;
  48. }
  49. channel_cap :: proc(ch: $C/Channel($T, $D)) -> int {
  50. return ch._internal.cap if ch._internal != nil else 0;
  51. }
  52. channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both {
  53. msg := msg;
  54. _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc);
  55. }
  56. channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both {
  57. msg := msg;
  58. return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc);
  59. }
  60. channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both {
  61. c := ch._internal;
  62. if c == nil {
  63. panic(message="cannot recv message; channel is nil", loc=loc);
  64. }
  65. mutex_lock(&c.mutex);
  66. raw_channel_recv_impl(c, &msg, loc);
  67. mutex_unlock(&c.mutex);
  68. return;
  69. }
  70. channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both {
  71. c := ch._internal;
  72. if c != nil && mutex_try_lock(&c.mutex) {
  73. if c.len > 0 {
  74. raw_channel_recv_impl(c, &msg, loc);
  75. ok = true;
  76. }
  77. mutex_unlock(&c.mutex);
  78. }
  79. return;
  80. }
  81. channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both {
  82. res: T;
  83. res, ok = channel_try_recv(ch, loc);
  84. if ok && msg != nil {
  85. msg^ = res;
  86. }
  87. return;
  88. }
  89. channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool {
  90. return ch._internal == nil;
  91. }
  92. channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool {
  93. c := ch._internal;
  94. return c != nil && !c.closed;
  95. }
  96. channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool {
  97. return a._internal == b._internal;
  98. }
  99. channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool {
  100. return a._internal != b._internal;
  101. }
  102. channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both {
  103. return raw_channel_can_send(ch._internal);
  104. }
  105. channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both {
  106. return raw_channel_can_recv(ch._internal);
  107. }
  108. channel_peek :: proc(ch: $C/Channel($T, $D)) -> int {
  109. c := ch._internal;
  110. if c == nil {
  111. return -1;
  112. }
  113. if intrinsics.atomic_load(&c.closed) {
  114. return -1;
  115. }
  116. return intrinsics.atomic_load(&c.len);
  117. }
  118. channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) {
  119. raw_channel_close(ch._internal, loc);
  120. }
  121. channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both {
  122. c := ch._internal;
  123. if c == nil {
  124. return;
  125. }
  126. if !c.closed || c.len > 0 {
  127. msg, ok = channel_recv(ch), true;
  128. }
  129. return;
  130. }
  131. channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both {
  132. raw_channel_drain(ch._internal);
  133. }
  134. channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both {
  135. for msg in channel_iterator(src) {
  136. channel_send(dst, msg);
  137. }
  138. }
  139. Raw_Channel_Wait_Queue :: struct {
  140. next: ^Raw_Channel_Wait_Queue,
  141. state: ^uintptr,
  142. }
  143. Raw_Channel :: struct {
  144. closed: bool,
  145. ready: bool, // ready to recv
  146. data_offset: u16, // data is stored at the end of this data structure
  147. elem_size: u32,
  148. len, cap: int,
  149. read, write: int,
  150. mutex: Mutex,
  151. cond: Condition,
  152. allocator: mem.Allocator,
  153. sendq: ^Raw_Channel_Wait_Queue,
  154. recvq: ^Raw_Channel_Wait_Queue,
  155. }
  156. raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
  157. val.next = head^;
  158. head^ = val;
  159. }
  160. raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
  161. p := head;
  162. for p^ != nil && p^ != val {
  163. p = &p^.next;
  164. }
  165. if p != nil {
  166. p^ = p^.next;
  167. }
  168. }
  169. raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel {
  170. assert(int(u32(elem_size)) == elem_size);
  171. s := size_of(Raw_Channel);
  172. s = mem.align_forward_int(s, elem_align);
  173. data_offset := uintptr(s);
  174. s += elem_size * max(cap, 1);
  175. a := max(elem_align, align_of(Raw_Channel));
  176. c := (^Raw_Channel)(mem.alloc(s, a));
  177. if c == nil {
  178. return nil;
  179. }
  180. c.data_offset = u16(data_offset);
  181. c.elem_size = u32(elem_size);
  182. c.len, c.cap = 0, max(cap, 0);
  183. c.read, c.write = 0, 0;
  184. mutex_init(&c.mutex);
  185. condition_init(&c.cond, &c.mutex);
  186. c.allocator = context.allocator;
  187. c.closed = false;
  188. return c;
  189. }
  190. raw_channel_destroy :: proc(c: ^Raw_Channel) {
  191. if c == nil {
  192. return;
  193. }
  194. context.allocator = c.allocator;
  195. intrinsics.atomic_store(&c.closed, true);
  196. condition_destroy(&c.cond);
  197. mutex_destroy(&c.mutex);
  198. free(c);
  199. }
  200. raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) {
  201. if c == nil {
  202. panic(message="cannot close nil channel", loc=loc);
  203. }
  204. mutex_lock(&c.mutex);
  205. defer mutex_unlock(&c.mutex);
  206. intrinsics.atomic_store(&c.closed, true);
  207. // Release readers and writers
  208. raw_channel_wait_queue_broadcast(c.recvq);
  209. raw_channel_wait_queue_broadcast(c.sendq);
  210. condition_broadcast(&c.cond);
  211. }
  212. raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool {
  213. send :: proc(c: ^Raw_Channel, src: rawptr) {
  214. data := uintptr(c) + uintptr(c.data_offset);
  215. dst := data + uintptr(c.write * int(c.elem_size));
  216. mem.copy(rawptr(dst), src, int(c.elem_size));
  217. c.len += 1;
  218. c.write = (c.write + 1) % max(c.cap, 1);
  219. }
  220. switch {
  221. case c == nil:
  222. panic(message="cannot send message; channel is nil", loc=loc);
  223. case c.closed:
  224. panic(message="cannot send message; channel is closed", loc=loc);
  225. }
  226. mutex_lock(&c.mutex);
  227. defer mutex_unlock(&c.mutex);
  228. if c.cap > 0 {
  229. if !block && c.len >= c.cap {
  230. return false;
  231. }
  232. for c.len >= c.cap {
  233. condition_wait_for(&c.cond);
  234. }
  235. } else if c.len > 0 { // TODO(bill): determine correct behaviour
  236. if !block {
  237. return false;
  238. }
  239. condition_wait_for(&c.cond);
  240. } else if c.len == 0 && !block {
  241. return false;
  242. }
  243. send(c, msg);
  244. condition_signal(&c.cond);
  245. raw_channel_wait_queue_signal(c.recvq);
  246. return true;
  247. }
  248. raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) {
  249. recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) {
  250. if c.len < 1 {
  251. panic(message="cannot recv message; channel is empty", loc=loc);
  252. }
  253. c.len -= 1;
  254. data := uintptr(c) + uintptr(c.data_offset);
  255. src := data + uintptr(c.read * int(c.elem_size));
  256. mem.copy(dst, rawptr(src), int(c.elem_size));
  257. c.read = (c.read + 1) % max(c.cap, 1);
  258. }
  259. if c == nil {
  260. panic(message="cannot recv message; channel is nil", loc=loc);
  261. }
  262. intrinsics.atomic_store(&c.ready, true);
  263. for c.len < 1 {
  264. raw_channel_wait_queue_signal(c.sendq);
  265. condition_wait_for(&c.cond);
  266. }
  267. intrinsics.atomic_store(&c.ready, false);
  268. recv(c, res, loc);
  269. if c.cap > 0 {
  270. if c.len == c.cap - 1 {
  271. // NOTE(bill): Only signal on the last one
  272. condition_signal(&c.cond);
  273. }
  274. } else {
  275. condition_signal(&c.cond);
  276. }
  277. }
  278. raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
  279. if c == nil {
  280. return false;
  281. }
  282. mutex_lock(&c.mutex);
  283. switch {
  284. case c.closed:
  285. ok = false;
  286. case c.cap > 0:
  287. ok = c.ready && c.len < c.cap;
  288. case:
  289. ok = c.ready && c.len == 0;
  290. }
  291. mutex_unlock(&c.mutex);
  292. return;
  293. }
  294. raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
  295. if c == nil {
  296. return false;
  297. }
  298. mutex_lock(&c.mutex);
  299. ok = c.len > 0;
  300. mutex_unlock(&c.mutex);
  301. return;
  302. }
  303. raw_channel_drain :: proc(c: ^Raw_Channel) {
  304. if c == nil {
  305. return;
  306. }
  307. mutex_lock(&c.mutex);
  308. c.len = 0;
  309. c.read = 0;
  310. c.write = 0;
  311. mutex_unlock(&c.mutex);
  312. }
  313. MAX_SELECT_CHANNELS :: 64;
  314. SELECT_MAX_TIMEOUT :: max(time.Duration);
  315. Select_Command :: enum {
  316. Recv,
  317. Send,
  318. }
  319. Select_Channel :: struct {
  320. channel: ^Raw_Channel,
  321. command: Select_Command,
  322. }
  323. select :: proc(channels: ..Select_Channel) -> (index: int) {
  324. return select_timeout(SELECT_MAX_TIMEOUT, ..channels);
  325. }
  326. select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) {
  327. switch len(channels) {
  328. case 0:
  329. panic("sync: select with no channels");
  330. }
  331. assert(len(channels) <= MAX_SELECT_CHANNELS);
  332. backing: [MAX_SELECT_CHANNELS]int;
  333. queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
  334. candidates := backing[:];
  335. cap := len(channels);
  336. candidates = candidates[:cap];
  337. count := u32(0);
  338. for c, i in channels {
  339. if c.channel == nil {
  340. continue;
  341. }
  342. switch c.command {
  343. case .Recv:
  344. if raw_channel_can_recv(c.channel) {
  345. candidates[count] = i;
  346. count += 1;
  347. }
  348. case .Send:
  349. if raw_channel_can_send(c.channel) {
  350. candidates[count] = i;
  351. count += 1;
  352. }
  353. }
  354. }
  355. if count == 0 {
  356. wait_state: uintptr = 0;
  357. for _, i in channels {
  358. q := &queues[i];
  359. q.state = &wait_state;
  360. }
  361. for c, i in channels {
  362. if c.channel == nil {
  363. continue;
  364. }
  365. q := &queues[i];
  366. switch c.command {
  367. case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q);
  368. case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q);
  369. }
  370. }
  371. raw_channel_wait_queue_wait_on(&wait_state, timeout);
  372. for c, i in channels {
  373. if c.channel == nil {
  374. continue;
  375. }
  376. q := &queues[i];
  377. switch c.command {
  378. case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q);
  379. case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q);
  380. }
  381. }
  382. for c, i in channels {
  383. switch c.command {
  384. case .Recv:
  385. if raw_channel_can_recv(c.channel) {
  386. candidates[count] = i;
  387. count += 1;
  388. }
  389. case .Send:
  390. if raw_channel_can_send(c.channel) {
  391. candidates[count] = i;
  392. count += 1;
  393. }
  394. }
  395. }
  396. if count == 0 && timeout == SELECT_MAX_TIMEOUT {
  397. index = -1;
  398. return;
  399. }
  400. assert(count != 0);
  401. }
  402. t := time.now();
  403. r := rand.create(transmute(u64)t);
  404. i := rand.uint32(&r);
  405. index = candidates[i % count];
  406. return;
  407. }
  408. select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
  409. switch len(channels) {
  410. case 0:
  411. panic("sync: select with no channels");
  412. }
  413. assert(len(channels) <= MAX_SELECT_CHANNELS);
  414. backing: [MAX_SELECT_CHANNELS]int;
  415. queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
  416. candidates := backing[:];
  417. cap := len(channels);
  418. candidates = candidates[:cap];
  419. count := u32(0);
  420. for c, i in channels {
  421. if raw_channel_can_recv(c) {
  422. candidates[count] = i;
  423. count += 1;
  424. }
  425. }
  426. if count == 0 {
  427. state: uintptr;
  428. for c, i in channels {
  429. q := &queues[i];
  430. q.state = &state;
  431. raw_channel_wait_queue_insert(&c.recvq, q);
  432. }
  433. raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
  434. for c, i in channels {
  435. q := &queues[i];
  436. raw_channel_wait_queue_remove(&c.recvq, q);
  437. }
  438. for c, i in channels {
  439. if raw_channel_can_recv(c) {
  440. candidates[count] = i;
  441. count += 1;
  442. }
  443. }
  444. assert(count != 0);
  445. }
  446. t := time.now();
  447. r := rand.create(transmute(u64)t);
  448. i := rand.uint32(&r);
  449. index = candidates[i % count];
  450. return;
  451. }
  452. select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
  453. switch len(channels) {
  454. case 0:
  455. panic("sync: select with no channels");
  456. }
  457. assert(len(channels) <= MAX_SELECT_CHANNELS);
  458. queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
  459. candidates: [MAX_SELECT_CHANNELS]int;
  460. count := u32(0);
  461. for c, i in channels {
  462. if raw_channel_can_recv(c) {
  463. candidates[count] = i;
  464. count += 1;
  465. }
  466. }
  467. if count == 0 {
  468. state: uintptr;
  469. for c, i in channels {
  470. q := &queues[i];
  471. q.state = &state;
  472. raw_channel_wait_queue_insert(&c.recvq, q);
  473. }
  474. raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
  475. for c, i in channels {
  476. q := &queues[i];
  477. raw_channel_wait_queue_remove(&c.recvq, q);
  478. }
  479. for c, i in channels {
  480. if raw_channel_can_recv(c) {
  481. candidates[count] = i;
  482. count += 1;
  483. }
  484. }
  485. assert(count != 0);
  486. }
  487. t := time.now();
  488. r := rand.create(transmute(u64)t);
  489. i := rand.uint32(&r);
  490. index = candidates[i % count];
  491. msg = channel_recv(channels[index]);
  492. return;
  493. }
  494. select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
  495. switch len(channels) {
  496. case 0:
  497. panic("sync: select with no channels");
  498. }
  499. assert(len(channels) <= MAX_SELECT_CHANNELS);
  500. backing: [MAX_SELECT_CHANNELS]int;
  501. queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
  502. candidates := backing[:];
  503. cap := len(channels);
  504. candidates = candidates[:cap];
  505. count := u32(0);
  506. for c, i in channels {
  507. if raw_channel_can_recv(c) {
  508. candidates[count] = i;
  509. count += 1;
  510. }
  511. }
  512. if count == 0 {
  513. state: uintptr;
  514. for c, i in channels {
  515. q := &queues[i];
  516. q.state = &state;
  517. raw_channel_wait_queue_insert(&c.recvq, q);
  518. }
  519. raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
  520. for c, i in channels {
  521. q := &queues[i];
  522. raw_channel_wait_queue_remove(&c.recvq, q);
  523. }
  524. for c, i in channels {
  525. if raw_channel_can_recv(c) {
  526. candidates[count] = i;
  527. count += 1;
  528. }
  529. }
  530. assert(count != 0);
  531. }
  532. t := time.now();
  533. r := rand.create(transmute(u64)t);
  534. i := rand.uint32(&r);
  535. index = candidates[i % count];
  536. if msg != nil {
  537. channel_send(channels[index], msg);
  538. }
  539. return;
  540. }
  541. select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
  542. switch len(channels) {
  543. case 0:
  544. panic("sync: select with no channels");
  545. }
  546. assert(len(channels) <= MAX_SELECT_CHANNELS);
  547. candidates: [MAX_SELECT_CHANNELS]int;
  548. queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
  549. count := u32(0);
  550. for c, i in channels {
  551. if raw_channel_can_send(c) {
  552. candidates[count] = i;
  553. count += 1;
  554. }
  555. }
  556. if count == 0 {
  557. state: uintptr;
  558. for c, i in channels {
  559. q := &queues[i];
  560. q.state = &state;
  561. raw_channel_wait_queue_insert(&c.sendq, q);
  562. }
  563. raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
  564. for c, i in channels {
  565. q := &queues[i];
  566. raw_channel_wait_queue_remove(&c.sendq, q);
  567. }
  568. for c, i in channels {
  569. if raw_channel_can_send(c) {
  570. candidates[count] = i;
  571. count += 1;
  572. }
  573. }
  574. assert(count != 0);
  575. }
  576. t := time.now();
  577. r := rand.create(transmute(u64)t);
  578. i := rand.uint32(&r);
  579. index = candidates[i % count];
  580. return;
  581. }
  582. select_try :: proc(channels: ..Select_Channel) -> (index: int) {
  583. switch len(channels) {
  584. case 0:
  585. panic("sync: select with no channels");
  586. }
  587. assert(len(channels) <= MAX_SELECT_CHANNELS);
  588. backing: [MAX_SELECT_CHANNELS]int;
  589. candidates := backing[:];
  590. cap := len(channels);
  591. candidates = candidates[:cap];
  592. count := u32(0);
  593. for c, i in channels {
  594. switch c.command {
  595. case .Recv:
  596. if raw_channel_can_recv(c.channel) {
  597. candidates[count] = i;
  598. count += 1;
  599. }
  600. case .Send:
  601. if raw_channel_can_send(c.channel) {
  602. candidates[count] = i;
  603. count += 1;
  604. }
  605. }
  606. }
  607. if count == 0 {
  608. index = -1;
  609. return;
  610. }
  611. t := time.now();
  612. r := rand.create(transmute(u64)t);
  613. i := rand.uint32(&r);
  614. index = candidates[i % count];
  615. return;
  616. }
  617. select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
  618. switch len(channels) {
  619. case 0:
  620. index = -1;
  621. return;
  622. case 1:
  623. index = -1;
  624. if raw_channel_can_recv(channels[0]) {
  625. index = 0;
  626. }
  627. return;
  628. }
  629. assert(len(channels) <= MAX_SELECT_CHANNELS);
  630. candidates: [MAX_SELECT_CHANNELS]int;
  631. count := u32(0);
  632. for c, i in channels {
  633. if raw_channel_can_recv(c) {
  634. candidates[count] = i;
  635. count += 1;
  636. }
  637. }
  638. if count == 0 {
  639. index = -1;
  640. return;
  641. }
  642. t := time.now();
  643. r := rand.create(transmute(u64)t);
  644. i := rand.uint32(&r);
  645. index = candidates[i % count];
  646. return;
  647. }
  648. select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check {
  649. switch len(channels) {
  650. case 0:
  651. return -1;
  652. case 1:
  653. if raw_channel_can_send(channels[0]) {
  654. return 0;
  655. }
  656. return -1;
  657. }
  658. assert(len(channels) <= MAX_SELECT_CHANNELS);
  659. candidates: [MAX_SELECT_CHANNELS]int;
  660. count := u32(0);
  661. for c, i in channels {
  662. if raw_channel_can_send(c) {
  663. candidates[count] = i;
  664. count += 1;
  665. }
  666. }
  667. if count == 0 {
  668. index = -1;
  669. return;
  670. }
  671. t := time.now();
  672. r := rand.create(transmute(u64)t);
  673. i := rand.uint32(&r);
  674. index = candidates[i % count];
  675. return;
  676. }
  677. select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
  678. switch len(channels) {
  679. case 0:
  680. index = -1;
  681. return;
  682. case 1:
  683. ok: bool;
  684. if msg, ok = channel_try_recv(channels[0]); ok {
  685. index = 0;
  686. }
  687. return;
  688. }
  689. assert(len(channels) <= MAX_SELECT_CHANNELS);
  690. candidates: [MAX_SELECT_CHANNELS]int;
  691. count := u32(0);
  692. for c, i in channels {
  693. if channel_can_recv(c) {
  694. candidates[count] = i;
  695. count += 1;
  696. }
  697. }
  698. if count == 0 {
  699. index = -1;
  700. return;
  701. }
  702. t := time.now();
  703. r := rand.create(transmute(u64)t);
  704. i := rand.uint32(&r);
  705. index = candidates[i % count];
  706. msg = channel_recv(channels[index]);
  707. return;
  708. }
  709. select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
  710. index = -1;
  711. switch len(channels) {
  712. case 0:
  713. return;
  714. case 1:
  715. if channel_try_send(channels[0], msg) {
  716. index = 0;
  717. }
  718. return;
  719. }
  720. assert(len(channels) <= MAX_SELECT_CHANNELS);
  721. candidates: [MAX_SELECT_CHANNELS]int;
  722. count := u32(0);
  723. for c, i in channels {
  724. if raw_channel_can_send(c) {
  725. candidates[count] = i;
  726. count += 1;
  727. }
  728. }
  729. if count == 0 {
  730. index = -1;
  731. return;
  732. }
  733. t := time.now();
  734. r := rand.create(transmute(u64)t);
  735. i := rand.uint32(&r);
  736. index = candidates[i % count];
  737. channel_send(channels[index], msg);
  738. return;
  739. }