|
@@ -1,9 +1,9 @@
|
|
|
-use std::{collections::HashMap, fmt::Write};
|
|
|
+use std::fmt::Write;
|
|
|
|
|
|
+use xitca_io::bytes::BytesMut;
|
|
|
use xitca_postgres::{
|
|
|
pipeline::Pipeline, statement::Statement, AsyncLendingIterator, SharedClient,
|
|
|
};
|
|
|
-use xitca_unsafe_collection::no_hash::NoHashBuilder;
|
|
|
|
|
|
use super::{
|
|
|
ser::{Fortune, Fortunes, World},
|
|
@@ -13,14 +13,16 @@ use super::{
|
|
|
pub struct Client {
|
|
|
client: SharedClient,
|
|
|
#[cfg(not(feature = "pg-sync"))]
|
|
|
- rng: std::cell::RefCell<Rand>,
|
|
|
+ shared: std::cell::RefCell<Shared>,
|
|
|
#[cfg(feature = "pg-sync")]
|
|
|
- rng: std::sync::Mutex<Rand>,
|
|
|
+ shared: std::sync::Mutex<Shared>,
|
|
|
fortune: Statement,
|
|
|
world: Statement,
|
|
|
- updates: HashMap<u16, Statement, NoHashBuilder>,
|
|
|
+ updates: Box<[Statement]>,
|
|
|
}
|
|
|
|
|
|
+type Shared = (Rand, BytesMut);
|
|
|
+
|
|
|
pub async fn create() -> HandleResult<Client> {
|
|
|
let mut client = SharedClient::new(DB_URL.to_string()).await?;
|
|
|
|
|
@@ -30,7 +32,11 @@ pub async fn create() -> HandleResult<Client> {
|
|
|
.prepare_cached("SELECT * FROM world WHERE id=$1", &[])
|
|
|
.await?;
|
|
|
|
|
|
- let mut updates = HashMap::default();
|
|
|
+ let mut updates = Vec::new();
|
|
|
+
|
|
|
+ // a dummy statement as placeholder of 0 index.
|
|
|
+ // avoid off by one calculation when using non zero u16 as slicing index.
|
|
|
+ updates.push(Statement::default());
|
|
|
|
|
|
for num in 1..=500u16 {
|
|
|
let mut pl = 1;
|
|
@@ -49,34 +55,36 @@ pub async fn create() -> HandleResult<Client> {
|
|
|
q.push(')');
|
|
|
|
|
|
let st = client.prepare_cached(&q, &[]).await?;
|
|
|
- updates.insert(num, st);
|
|
|
+ updates.push(st);
|
|
|
}
|
|
|
|
|
|
+ let shared = (Rand::default(), BytesMut::new());
|
|
|
+
|
|
|
Ok(Client {
|
|
|
client,
|
|
|
#[cfg(not(feature = "pg-sync"))]
|
|
|
- rng: std::cell::RefCell::new(Rand::default()),
|
|
|
+ shared: std::cell::RefCell::new(shared),
|
|
|
#[cfg(feature = "pg-sync")]
|
|
|
- rng: std::sync::Mutex::new(Rand::default()),
|
|
|
+ shared: std::sync::Mutex::new(shared),
|
|
|
fortune,
|
|
|
world,
|
|
|
- updates,
|
|
|
+ updates: updates.into_boxed_slice(),
|
|
|
})
|
|
|
}
|
|
|
|
|
|
impl Client {
|
|
|
#[cfg(not(feature = "pg-sync"))]
|
|
|
- fn borrow_rng_mut(&self) -> std::cell::RefMut<'_, Rand> {
|
|
|
- self.rng.borrow_mut()
|
|
|
+ fn shared(&self) -> std::cell::RefMut<'_, Shared> {
|
|
|
+ self.shared.borrow_mut()
|
|
|
}
|
|
|
|
|
|
#[cfg(feature = "pg-sync")]
|
|
|
- fn borrow_rng_mut(&self) -> std::sync::MutexGuard<'_, Rand> {
|
|
|
- self.rng.lock().unwrap()
|
|
|
+ fn shared(&self) -> std::sync::MutexGuard<'_, Shared> {
|
|
|
+ self.shared.lock().unwrap()
|
|
|
}
|
|
|
|
|
|
pub async fn get_world(&self) -> HandleResult<World> {
|
|
|
- let id = self.borrow_rng_mut().gen_id();
|
|
|
+ let id = self.shared().0.gen_id();
|
|
|
self.client
|
|
|
.query_raw(&self.world, [id])
|
|
|
.await?
|
|
@@ -87,17 +95,21 @@ impl Client {
|
|
|
}
|
|
|
|
|
|
pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
|
|
|
- let mut pipe = Pipeline::new();
|
|
|
+ let len = num as usize;
|
|
|
+
|
|
|
+ let mut res = {
|
|
|
+ let (ref mut rng, ref mut buf) = *self.shared();
|
|
|
+
|
|
|
+ let mut pipe = Pipeline::<_, false>::with_capacity_from_buf(len, buf);
|
|
|
|
|
|
- {
|
|
|
- let mut rng = self.borrow_rng_mut();
|
|
|
(0..num).try_for_each(|_| pipe.query_raw(&self.world, [rng.gen_id()]))?;
|
|
|
+
|
|
|
+ self.client.pipeline(pipe)
|
|
|
}
|
|
|
+ .await?;
|
|
|
|
|
|
- let mut worlds = Vec::new();
|
|
|
- worlds.reserve(num as usize);
|
|
|
+ let mut worlds = Vec::with_capacity(len);
|
|
|
|
|
|
- let mut res = self.client.pipeline(pipe).await?;
|
|
|
while let Some(mut item) = res.try_next().await? {
|
|
|
while let Some(row) = item.try_next().await? {
|
|
|
worlds.push(World::new(row.get_raw(0), row.get_raw(1)))
|
|
@@ -113,27 +125,31 @@ impl Client {
|
|
|
let mut params = Vec::new();
|
|
|
params.reserve(len * 3);
|
|
|
|
|
|
- let mut pipe = Pipeline::new();
|
|
|
+ let mut res = {
|
|
|
+ let (ref mut rng, ref mut buf) = *self.shared();
|
|
|
+
|
|
|
+ let mut pipe = Pipeline::<_, false>::with_capacity_from_buf(len + 1, buf);
|
|
|
|
|
|
- {
|
|
|
- let mut rng = self.borrow_rng_mut();
|
|
|
(0..num).try_for_each(|_| {
|
|
|
let w_id = rng.gen_id();
|
|
|
let r_id = rng.gen_id();
|
|
|
params.extend([w_id, r_id]);
|
|
|
pipe.query_raw(&self.world, [w_id])
|
|
|
})?;
|
|
|
- }
|
|
|
|
|
|
- params.extend_from_within(..len);
|
|
|
- let st = self.updates.get(&num).unwrap();
|
|
|
- pipe.query_raw(st, ¶ms)?;
|
|
|
+ params.extend_from_within(..len);
|
|
|
+
|
|
|
+ let st = self.updates.get(len).unwrap();
|
|
|
+ pipe.query_raw(st, ¶ms)?;
|
|
|
+
|
|
|
+ self.client.pipeline(pipe)
|
|
|
+ }
|
|
|
+ .await?;
|
|
|
|
|
|
let mut worlds = Vec::new();
|
|
|
worlds.reserve(len);
|
|
|
let mut r_ids = params.into_iter().skip(1).step_by(2);
|
|
|
|
|
|
- let mut res = self.client.pipeline(pipe).await?;
|
|
|
while let Some(mut item) = res.try_next().await? {
|
|
|
while let Some(row) = item.try_next().await? {
|
|
|
let r_id = r_ids.next().unwrap();
|
|
@@ -148,8 +164,8 @@ impl Client {
|
|
|
let mut items = Vec::with_capacity(32);
|
|
|
items.push(Fortune::new(0, "Additional fortune added at request time."));
|
|
|
|
|
|
- let mut stream = self.client.query_raw::<[i32; 0]>(&self.fortune, []).await?;
|
|
|
- while let Some(row) = stream.try_next().await? {
|
|
|
+ let mut res = self.client.query_raw::<[i32; 0]>(&self.fortune, []).await?;
|
|
|
+ while let Some(row) = res.try_next().await? {
|
|
|
items.push(Fortune::new(row.get_raw(0), row.get_raw::<String>(1)));
|
|
|
}
|
|
|
items.sort_by(|it, next| it.message.cmp(&next.message));
|