|
@@ -1,9 +1,7 @@
|
|
|
use std::fmt::Write;
|
|
|
|
|
|
use xitca_io::bytes::BytesMut;
|
|
|
-use xitca_postgres::{
|
|
|
- pipeline::Pipeline, statement::Statement, AsyncLendingIterator, SharedClient,
|
|
|
-};
|
|
|
+use xitca_postgres::{pipeline::Pipeline, AsyncLendingIterator, Pool, Type};
|
|
|
|
|
|
use super::{
|
|
|
ser::{Fortune, Fortunes, World},
|
|
@@ -11,64 +9,63 @@ use super::{
|
|
|
};
|
|
|
|
|
|
pub struct Client {
|
|
|
- client: SharedClient,
|
|
|
+ pool: Pool,
|
|
|
#[cfg(not(feature = "pg-sync"))]
|
|
|
shared: std::cell::RefCell<Shared>,
|
|
|
#[cfg(feature = "pg-sync")]
|
|
|
shared: std::sync::Mutex<Shared>,
|
|
|
- fortune: Statement,
|
|
|
- world: Statement,
|
|
|
- updates: Box<[Statement]>,
|
|
|
+ updates: Box<[Box<str>]>,
|
|
|
}
|
|
|
|
|
|
type Shared = (Rand, BytesMut);
|
|
|
|
|
|
-pub async fn create() -> HandleResult<Client> {
|
|
|
- let mut client = SharedClient::new(DB_URL.to_string()).await?;
|
|
|
+const FORTUNE_SQL: &str = "SELECT * FROM fortune";
|
|
|
|
|
|
- let fortune = client.prepare_cached("SELECT * FROM fortune", &[]).await?;
|
|
|
+const FORTUNE_SQL_TYPES: &[Type] = &[];
|
|
|
|
|
|
- let world = client
|
|
|
- .prepare_cached("SELECT * FROM world WHERE id=$1", &[])
|
|
|
- .await?;
|
|
|
+const WORLD_SQL: &str = "SELECT * FROM world WHERE id=$1";
|
|
|
|
|
|
- let mut updates = Vec::new();
|
|
|
+const WORLD_SQL_TYPES: &[Type] = &[Type::INT4];
|
|
|
|
|
|
- // a dummy statement as placeholder of 0 index.
|
|
|
- // avoid off by one calculation when using non zero u16 as slicing index.
|
|
|
- updates.push(Statement::default());
|
|
|
+fn update_query(num: usize) -> Box<str> {
|
|
|
+ const PREFIX: &str = "UPDATE world SET randomNumber = w.r FROM (VALUES ";
|
|
|
+ const SUFFIX: &str = ") AS w (i,r) WHERE world.id = w.i";
|
|
|
|
|
|
- for num in 1..=500u16 {
|
|
|
- let mut pl = 1;
|
|
|
- let mut q = String::new();
|
|
|
- q.push_str("UPDATE world SET randomnumber = CASE id ");
|
|
|
- for _ in 1..=num {
|
|
|
- let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
|
|
|
- pl += 2;
|
|
|
- }
|
|
|
- q.push_str("ELSE randomnumber END WHERE id IN (");
|
|
|
- for _ in 1..=num {
|
|
|
- let _ = write!(&mut q, "${},", pl);
|
|
|
- pl += 1;
|
|
|
- }
|
|
|
- q.pop();
|
|
|
- q.push(')');
|
|
|
+ let (_, mut query) = (1..=num).fold((1, String::from(PREFIX)), |(idx, mut query), _| {
|
|
|
+ write!(query, "(${}::int,${}::int),", idx, idx + 1).unwrap();
|
|
|
+ (idx + 2, query)
|
|
|
+ });
|
|
|
|
|
|
- let st = client.prepare_cached(&q, &[]).await?;
|
|
|
- updates.push(st);
|
|
|
- }
|
|
|
+ query.pop();
|
|
|
+
|
|
|
+ query.push_str(SUFFIX);
|
|
|
+
|
|
|
+ query.into_boxed_str()
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn create() -> HandleResult<Client> {
|
|
|
+ let pool = Pool::builder(DB_URL).capacity(1).build()?;
|
|
|
|
|
|
let shared = (Rand::default(), BytesMut::new());
|
|
|
|
|
|
+ let updates = core::iter::once(Box::from(""))
|
|
|
+ .chain((1..=500).map(update_query))
|
|
|
+ .collect::<Box<[Box<str>]>>();
|
|
|
+
|
|
|
+ {
|
|
|
+ let mut conn = pool.get().await?;
|
|
|
+ for update in updates.iter().skip(1) {
|
|
|
+ conn.prepare(update, &[]).await?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
Ok(Client {
|
|
|
- client,
|
|
|
+ pool,
|
|
|
#[cfg(not(feature = "pg-sync"))]
|
|
|
shared: std::cell::RefCell::new(shared),
|
|
|
#[cfg(feature = "pg-sync")]
|
|
|
shared: std::sync::Mutex::new(shared),
|
|
|
- fortune,
|
|
|
- world,
|
|
|
- updates: updates.into_boxed_slice(),
|
|
|
+ updates,
|
|
|
})
|
|
|
}
|
|
|
|
|
@@ -84,29 +81,26 @@ impl Client {
|
|
|
}
|
|
|
|
|
|
pub async fn get_world(&self) -> HandleResult<World> {
|
|
|
+ let mut conn = self.pool.get().await?;
|
|
|
+ let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
|
|
|
let id = self.shared().0.gen_id();
|
|
|
- self.client
|
|
|
- .query_raw(&self.world, [id])
|
|
|
- .await?
|
|
|
- .try_next()
|
|
|
- .await?
|
|
|
- .map(|row| World::new(row.get_raw(0), row.get_raw(1)))
|
|
|
- .ok_or_else(|| "World does not exist".into())
|
|
|
+ let mut res = conn.consume().query_raw(&stmt, [id])?;
|
|
|
+ let row = res.try_next().await?.ok_or_else(|| "World does not exist")?;
|
|
|
+ Ok(World::new(row.get_raw(0), row.get_raw(1)))
|
|
|
}
|
|
|
|
|
|
pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
|
|
|
let len = num as usize;
|
|
|
|
|
|
+ let mut conn = self.pool.get().await?;
|
|
|
+ let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
|
|
|
+
|
|
|
let mut res = {
|
|
|
let (ref mut rng, ref mut buf) = *self.shared();
|
|
|
-
|
|
|
let mut pipe = Pipeline::with_capacity_from_buf(len, buf);
|
|
|
-
|
|
|
- (0..num).try_for_each(|_| pipe.query_raw(&self.world, [rng.gen_id()]))?;
|
|
|
-
|
|
|
- self.client.pipeline(pipe)
|
|
|
- }
|
|
|
- .await?;
|
|
|
+ (0..num).try_for_each(|_| pipe.query_raw(&stmt, [rng.gen_id()]))?;
|
|
|
+ conn.consume().pipeline(pipe)?
|
|
|
+ };
|
|
|
|
|
|
let mut worlds = Vec::with_capacity(len);
|
|
|
|
|
@@ -122,37 +116,34 @@ impl Client {
|
|
|
pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
|
|
|
let len = num as usize;
|
|
|
|
|
|
- let mut params = Vec::new();
|
|
|
- params.reserve(len * 3);
|
|
|
+ let update = self.updates.get(len).ok_or_else(|| "num out of bound")?;
|
|
|
+
|
|
|
+ let mut conn = self.pool.get().await?;
|
|
|
+ let world_stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
|
|
|
+ let update_stmt = conn.prepare(&update, &[]).await?;
|
|
|
+
|
|
|
+ let mut params = Vec::with_capacity(len);
|
|
|
|
|
|
let mut res = {
|
|
|
let (ref mut rng, ref mut buf) = *self.shared();
|
|
|
-
|
|
|
let mut pipe = Pipeline::with_capacity_from_buf(len + 1, buf);
|
|
|
-
|
|
|
(0..num).try_for_each(|_| {
|
|
|
let w_id = rng.gen_id();
|
|
|
let r_id = rng.gen_id();
|
|
|
- params.extend([w_id, r_id]);
|
|
|
- pipe.query_raw(&self.world, [w_id])
|
|
|
+ params.push([w_id, r_id]);
|
|
|
+ pipe.query_raw(&world_stmt, [w_id])
|
|
|
})?;
|
|
|
+ pipe.query_raw(&update_stmt, sort_update_params(¶ms))?;
|
|
|
+ conn.consume().pipeline(pipe)?
|
|
|
+ };
|
|
|
|
|
|
- params.extend_from_within(..len);
|
|
|
-
|
|
|
- let st = self.updates.get(len).unwrap();
|
|
|
- pipe.query_raw(st, ¶ms)?;
|
|
|
-
|
|
|
- self.client.pipeline(pipe)
|
|
|
- }
|
|
|
- .await?;
|
|
|
+ let mut worlds = Vec::with_capacity(len);
|
|
|
|
|
|
- let mut worlds = Vec::new();
|
|
|
- worlds.reserve(len);
|
|
|
- let mut r_ids = params.into_iter().skip(1).step_by(2);
|
|
|
+ let mut r_ids = params.into_iter();
|
|
|
|
|
|
while let Some(mut item) = res.try_next().await? {
|
|
|
while let Some(row) = item.try_next().await? {
|
|
|
- let r_id = r_ids.next().unwrap();
|
|
|
+ let r_id = r_ids.next().unwrap()[1];
|
|
|
worlds.push(World::new(row.get_raw(0), r_id))
|
|
|
}
|
|
|
}
|
|
@@ -164,12 +155,46 @@ impl Client {
|
|
|
let mut items = Vec::with_capacity(32);
|
|
|
items.push(Fortune::new(0, "Additional fortune added at request time."));
|
|
|
|
|
|
- let mut res = self.client.query_raw::<[i32; 0]>(&self.fortune, []).await?;
|
|
|
+ let mut conn = self.pool.get().await?;
|
|
|
+ let stmt = conn.prepare(FORTUNE_SQL, FORTUNE_SQL_TYPES).await?;
|
|
|
+ let mut res = conn.consume().query_raw::<[i32; 0]>(&stmt, [])?;
|
|
|
+
|
|
|
while let Some(row) = res.try_next().await? {
|
|
|
items.push(Fortune::new(row.get_raw(0), row.get_raw::<String>(1)));
|
|
|
}
|
|
|
+
|
|
|
items.sort_by(|it, next| it.message.cmp(&next.message));
|
|
|
|
|
|
Ok(Fortunes::new(items))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+fn sort_update_params(params: &Vec<[i32; 2]>) -> impl ExactSizeIterator<Item = i32> {
|
|
|
+ let mut params = params.clone();
|
|
|
+ params.sort_by(|a, b| a[0].cmp(&b[0]));
|
|
|
+
|
|
|
+ struct ParamIter<I>(I);
|
|
|
+
|
|
|
+ impl<I> Iterator for ParamIter<I>
|
|
|
+ where
|
|
|
+ I: Iterator,
|
|
|
+ {
|
|
|
+ type Item = I::Item;
|
|
|
+
|
|
|
+ #[inline]
|
|
|
+ fn next(&mut self) -> Option<Self::Item> {
|
|
|
+ self.0.next()
|
|
|
+ }
|
|
|
+
|
|
|
+ #[inline]
|
|
|
+ fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
+ self.0.size_hint()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // impl depends on compiler optimization to flat Vec<[T]> to Vec<T> when inferring
|
|
|
+ // it's size hint. possible to cause runtime panic.
|
|
|
+ impl<I> ExactSizeIterator for ParamIter<I> where I: Iterator {}
|
|
|
+
|
|
|
+ ParamIter(params.into_iter().flatten())
|
|
|
+}
|