db_unrealistic.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. //! this module is unrealistic. related issue:
  2. //! https://github.com/TechEmpower/FrameworkBenchmarks/issues/8790
  3. #[path = "./db_util.rs"]
  4. mod db_util;
  5. use std::cell::RefCell;
  6. use xitca_postgres::{Execute, iter::AsyncLendingIterator, pipeline::Pipeline, statement::Statement};
  7. use super::{
  8. ser::{Fortune, Fortunes, World},
  9. util::{DB_URL, HandleResult},
  10. };
  11. use db_util::{FORTUNE_STMT, Shared, WORLD_STMT, not_found, sort_update_params, update_query_from_num};
  12. pub struct Client {
  13. cli: xitca_postgres::Client,
  14. shared: RefCell<Shared>,
  15. fortune: Statement,
  16. world: Statement,
  17. updates: Box<[Statement]>,
  18. }
  19. pub async fn create() -> HandleResult<Client> {
  20. let (cli, mut drv) = xitca_postgres::Postgres::new(DB_URL).connect().await?;
  21. tokio::task::spawn(tokio::task::unconstrained(async move {
  22. while drv.try_next().await?.is_some() {}
  23. HandleResult::Ok(())
  24. }));
  25. let world = WORLD_STMT.execute(&cli).await?.leak();
  26. let fortune = FORTUNE_STMT.execute(&cli).await?.leak();
  27. let mut updates = vec![Statement::default()];
  28. for update in (1..=500).map(update_query_from_num) {
  29. let stmt = Statement::named(&update, &[]).execute(&cli).await?.leak();
  30. updates.push(stmt);
  31. }
  32. Ok(Client {
  33. cli,
  34. shared: Default::default(),
  35. world,
  36. fortune,
  37. updates: updates.into_boxed_slice(),
  38. })
  39. }
  40. impl Client {
  41. pub async fn get_world(&self) -> HandleResult<World> {
  42. let id = self.shared.borrow_mut().0.gen_id();
  43. let mut res = self.world.bind([id]).query(&self.cli).await?;
  44. let row = res.try_next().await?.ok_or_else(not_found)?;
  45. Ok(World::new(row.get(0), row.get(1)))
  46. }
  47. pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
  48. let len = num as usize;
  49. let mut res = {
  50. let (ref mut rng, ref mut buf) = *self.shared.borrow_mut();
  51. // unrealistic as all queries are sent with only one sync point.
  52. let mut pipe = Pipeline::unsync_with_capacity_from_buf(len, buf);
  53. (0..num).try_for_each(|_| self.world.bind([rng.gen_id()]).query(&mut pipe))?;
  54. pipe.query(&self.cli)?
  55. };
  56. let mut worlds = Vec::with_capacity(len);
  57. while let Some(mut item) = res.try_next().await? {
  58. let row = item.try_next().await?.ok_or_else(not_found)?;
  59. worlds.push(World::new(row.get(0), row.get(1)));
  60. }
  61. Ok(worlds)
  62. }
  63. pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
  64. let len = num as usize;
  65. let mut params = Vec::with_capacity(len);
  66. let mut res = {
  67. let (ref mut rng, ref mut buf) = *self.shared.borrow_mut();
  68. // unrealistic as all queries are sent with only one sync point.
  69. let mut pipe = Pipeline::unsync_with_capacity_from_buf(len + 1, buf);
  70. (0..num).try_for_each(|_| {
  71. let w_id = rng.gen_id();
  72. let r_id = rng.gen_id();
  73. params.push([w_id, r_id]);
  74. self.world.bind([w_id]).query(&mut pipe)
  75. })?;
  76. self.updates[len].bind(sort_update_params(&params)).query(&mut pipe)?;
  77. pipe.query(&self.cli)?
  78. };
  79. let mut worlds = Vec::with_capacity(len);
  80. let mut r_ids = params.into_iter();
  81. while let Some(mut item) = res.try_next().await? {
  82. while let Some(row) = item.try_next().await? {
  83. let r_id = r_ids.next().unwrap()[1];
  84. worlds.push(World::new(row.get(0), r_id))
  85. }
  86. }
  87. Ok(worlds)
  88. }
  89. pub async fn tell_fortune(&self) -> HandleResult<Fortunes> {
  90. let mut items = Vec::with_capacity(32);
  91. items.push(Fortune::new(0, "Additional fortune added at request time."));
  92. let mut res = self.fortune.query(&self.cli).await?;
  93. while let Some(row) = res.try_next().await? {
  94. items.push(Fortune::new(row.get(0), row.get::<String>(1)));
  95. }
  96. items.sort_by(|it, next| it.message.cmp(&next.message));
  97. Ok(Fortunes::new(items))
  98. }
  99. }