|
@@ -1,147 +1,190 @@
|
|
|
-//! Db executor actor
|
|
|
-
|
|
|
-use std::io;
|
|
|
+use std::{borrow::Cow, collections::HashMap, fmt::Write, io, rc::Rc};
|
|
|
+
|
|
|
+use actix_http::{body::BoxBody, Response};
|
|
|
+use actix_rt::pin;
|
|
|
+use bytes::{Bytes, BytesMut};
|
|
|
+use futures::{
|
|
|
+ stream::futures_unordered::FuturesUnordered, FutureExt, StreamExt, TryStreamExt,
|
|
|
+};
|
|
|
+use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
|
|
|
+use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
|
|
|
+
|
|
|
+use crate::{
|
|
|
+ models::World,
|
|
|
+ utils::{Fortune, Writer},
|
|
|
+};
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub enum PgError {
|
|
|
+ Io(io::Error),
|
|
|
+ Pg(tokio_postgres::Error),
|
|
|
+}
|
|
|
|
|
|
-use actix::prelude::*;
|
|
|
-use diesel::prelude::*;
|
|
|
-use diesel::result::Error;
|
|
|
-use rand::rngs::SmallRng;
|
|
|
-use rand::{Rng, SeedableRng};
|
|
|
+impl From<io::Error> for PgError {
|
|
|
+ fn from(err: io::Error) -> Self {
|
|
|
+ PgError::Io(err)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
-use crate::models;
|
|
|
+impl From<tokio_postgres::Error> for PgError {
|
|
|
+ fn from(err: tokio_postgres::Error) -> Self {
|
|
|
+ PgError::Pg(err)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
-pub struct DbExecutor {
|
|
|
- conn: PgConnection,
|
|
|
- rng: SmallRng,
|
|
|
+impl From<PgError> for Response<BoxBody> {
|
|
|
+ fn from(_err: PgError) -> Self {
|
|
|
+ Response::internal_server_error()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-impl Actor for DbExecutor {
|
|
|
- type Context = SyncContext<Self>;
|
|
|
+/// Postgres interface
|
|
|
+pub struct PgConnection {
|
|
|
+ client: Client,
|
|
|
+ fortune: Statement,
|
|
|
+ world: Statement,
|
|
|
+ updates: HashMap<u16, Statement>,
|
|
|
}
|
|
|
|
|
|
-impl DbExecutor {
|
|
|
- pub fn new(db_url: &str) -> DbExecutor {
|
|
|
- DbExecutor {
|
|
|
- conn: PgConnection::establish(db_url)
|
|
|
- .expect(&format!("Error connecting to {}", db_url)),
|
|
|
- rng: SmallRng::from_entropy(),
|
|
|
+impl PgConnection {
|
|
|
+ pub async fn connect(db_url: &str) -> Rc<PgConnection> {
|
|
|
+ let (cl, conn) = connect(db_url, NoTls)
|
|
|
+ .await
|
|
|
+ .expect("can not connect to postgresql");
|
|
|
+
|
|
|
+ actix_rt::spawn(conn.map(|_| ()));
|
|
|
+
|
|
|
+ let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
|
|
|
+ let mut updates = HashMap::new();
|
|
|
+
|
|
|
+ for num in 1..=500u16 {
|
|
|
+ let mut pl = 1;
|
|
|
+ let mut q = String::new();
|
|
|
+
|
|
|
+ q.push_str("UPDATE world SET randomnumber = CASE id ");
|
|
|
+
|
|
|
+ for _ in 1..=num {
|
|
|
+ let _ = write!(q, "when ${} then ${} ", pl, pl + 1);
|
|
|
+ pl += 2;
|
|
|
+ }
|
|
|
+
|
|
|
+ q.push_str("ELSE randomnumber END WHERE id IN (");
|
|
|
+
|
|
|
+ for _ in 1..=num {
|
|
|
+ let _ = write!(q, "${},", pl);
|
|
|
+ pl += 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ q.pop();
|
|
|
+ q.push(')');
|
|
|
+
|
|
|
+ updates.insert(num, cl.prepare(&q).await.unwrap());
|
|
|
}
|
|
|
+
|
|
|
+ let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
|
|
|
+
|
|
|
+ Rc::new(PgConnection {
|
|
|
+ client: cl,
|
|
|
+ fortune,
|
|
|
+ world,
|
|
|
+ updates,
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub struct RandomWorld;
|
|
|
+impl PgConnection {
|
|
|
+ async fn query_one_world(&self, id: i32) -> Result<World, PgError> {
|
|
|
+ let stream = self.client.query_raw(&self.world, &[&id]).await?;
|
|
|
+ pin!(stream);
|
|
|
+ let row = stream.next().await.unwrap()?;
|
|
|
+ Ok(World::new(row.get(0), row.get(1)))
|
|
|
+ }
|
|
|
|
|
|
-impl Message for RandomWorld {
|
|
|
- type Result = io::Result<models::World>;
|
|
|
-}
|
|
|
+ pub async fn get_world(&self) -> Result<Bytes, PgError> {
|
|
|
+ let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
|
|
|
|
|
|
-impl Handler<RandomWorld> for DbExecutor {
|
|
|
- type Result = io::Result<models::World>;
|
|
|
+ let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
|
|
|
|
|
|
- fn handle(&mut self, _: RandomWorld, _: &mut Self::Context) -> Self::Result {
|
|
|
- use crate::schema::world::dsl::*;
|
|
|
+ let world = self.query_one_world(random_id).await?;
|
|
|
+ let mut body = BytesMut::with_capacity(40);
|
|
|
+ serde_json::to_writer(Writer(&mut body), &world).unwrap();
|
|
|
|
|
|
- let random_id = self.rng.gen_range(1, 10_001);
|
|
|
- match world
|
|
|
- .filter(id.eq(random_id))
|
|
|
- .load::<models::World>(&self.conn)
|
|
|
- {
|
|
|
- Ok(mut items) => Ok(items.pop().unwrap()),
|
|
|
- Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Database error")),
|
|
|
- }
|
|
|
+ Ok(body.freeze())
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-pub struct RandomWorlds(pub u16);
|
|
|
+ pub async fn get_worlds(&self, num: usize) -> Result<Vec<World>, PgError> {
|
|
|
+ let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
|
|
|
|
|
|
-impl Message for RandomWorlds {
|
|
|
- type Result = io::Result<Vec<models::World>>;
|
|
|
-}
|
|
|
+ let worlds = FuturesUnordered::new();
|
|
|
|
|
|
-impl Handler<RandomWorlds> for DbExecutor {
|
|
|
- type Result = io::Result<Vec<models::World>>;
|
|
|
+ for _ in 0..num {
|
|
|
+ let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
|
|
|
+ worlds.push(self.query_one_world(w_id));
|
|
|
+ }
|
|
|
+
|
|
|
+ worlds.try_collect().await
|
|
|
+ }
|
|
|
|
|
|
- fn handle(&mut self, msg: RandomWorlds, _: &mut Self::Context) -> Self::Result {
|
|
|
- use crate::schema::world::dsl::*;
|
|
|
+ pub async fn update(&self, num: u16) -> Result<Vec<World>, PgError> {
|
|
|
+ let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
|
|
|
|
|
|
- let mut worlds = Vec::with_capacity(msg.0 as usize);
|
|
|
- for _ in 0..msg.0 {
|
|
|
- let w_id = self.rng.gen_range(1, 10_001);
|
|
|
- let w = match world.filter(id.eq(w_id)).load::<models::World>(&self.conn) {
|
|
|
- Ok(mut items) => items.pop().unwrap(),
|
|
|
- Err(_) => {
|
|
|
- return Err(io::Error::new(io::ErrorKind::Other, "Database error"));
|
|
|
+ let worlds = FuturesUnordered::new();
|
|
|
+
|
|
|
+ for _ in 0..num {
|
|
|
+ let id = (rng.gen::<u32>() % 10_000 + 1) as i32;
|
|
|
+ let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
|
|
|
+
|
|
|
+ worlds.push(self.query_one_world(w_id).map(move |res| match res {
|
|
|
+ Ok(mut world) => {
|
|
|
+ world.randomnumber = id;
|
|
|
+ Ok(world)
|
|
|
}
|
|
|
- };
|
|
|
- worlds.push(w)
|
|
|
+
|
|
|
+ Err(err) => Err(err),
|
|
|
+ }));
|
|
|
}
|
|
|
- Ok(worlds)
|
|
|
- }
|
|
|
-}
|
|
|
|
|
|
-pub struct UpdateWorld(pub u16);
|
|
|
+ let st = self.updates.get(&num).unwrap().clone();
|
|
|
|
|
|
-impl Message for UpdateWorld {
|
|
|
- type Result = io::Result<Vec<models::World>>;
|
|
|
-}
|
|
|
+ let worlds: Vec<World> = worlds.try_collect().await?;
|
|
|
|
|
|
-impl Handler<UpdateWorld> for DbExecutor {
|
|
|
- type Result = io::Result<Vec<models::World>>;
|
|
|
+ let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num as usize * 3);
|
|
|
|
|
|
- fn handle(&mut self, msg: UpdateWorld, _: &mut Self::Context) -> Self::Result {
|
|
|
- use crate::schema::world::dsl::*;
|
|
|
+ for w in &worlds {
|
|
|
+ params.push(&w.id);
|
|
|
+ params.push(&w.randomnumber);
|
|
|
+ }
|
|
|
|
|
|
- let mut worlds = Vec::with_capacity(msg.0 as usize);
|
|
|
- for _ in 0..msg.0 {
|
|
|
- let w_id: i32 = self.rng.gen_range(1, 10_001);
|
|
|
- let mut w = match world.filter(id.eq(w_id)).load::<models::World>(&self.conn)
|
|
|
- {
|
|
|
- Ok(mut items) => items.pop().unwrap(),
|
|
|
- Err(_) => {
|
|
|
- return Err(io::Error::new(io::ErrorKind::Other, "Database error"));
|
|
|
- }
|
|
|
- };
|
|
|
- w.randomnumber = self.rng.gen_range(1, 10_001);
|
|
|
- worlds.push(w);
|
|
|
+ for w in &worlds {
|
|
|
+ params.push(&w.id);
|
|
|
}
|
|
|
- worlds.sort_by_key(|w| w.id);
|
|
|
-
|
|
|
- let _ = self.conn.transaction::<(), Error, _>(|| {
|
|
|
- for w in &worlds {
|
|
|
- let _ = diesel::update(world)
|
|
|
- .filter(id.eq(w.id))
|
|
|
- .set(randomnumber.eq(w.randomnumber))
|
|
|
- .execute(&self.conn);
|
|
|
- }
|
|
|
- Ok(())
|
|
|
- });
|
|
|
+
|
|
|
+ self.client.query(&st, ¶ms[..]).await?;
|
|
|
|
|
|
Ok(worlds)
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-pub struct TellFortune;
|
|
|
+ pub async fn tell_fortune(&self) -> Result<Vec<Fortune>, PgError> {
|
|
|
+ let mut items = vec![Fortune {
|
|
|
+ id: 0,
|
|
|
+ message: Cow::Borrowed("Additional fortune added at request time."),
|
|
|
+ }];
|
|
|
|
|
|
-impl Message for TellFortune {
|
|
|
- type Result = io::Result<Vec<models::Fortune>>;
|
|
|
-}
|
|
|
+ let fut = self.client.query_raw::<_, _, &[i32; 0]>(&self.fortune, &[]);
|
|
|
|
|
|
-impl Handler<TellFortune> for DbExecutor {
|
|
|
- type Result = io::Result<Vec<models::Fortune>>;
|
|
|
+ let stream = fut.await?;
|
|
|
+ pin!(stream);
|
|
|
|
|
|
- fn handle(&mut self, _: TellFortune, _: &mut Self::Context) -> Self::Result {
|
|
|
- use crate::schema::fortune::dsl::*;
|
|
|
+ while let Some(row) = stream.next().await {
|
|
|
+ let row = row?;
|
|
|
|
|
|
- match fortune.load::<models::Fortune>(&self.conn) {
|
|
|
- Ok(mut items) => {
|
|
|
- items.push(models::Fortune {
|
|
|
- id: 0,
|
|
|
- message: "Additional fortune added at request time.".to_string(),
|
|
|
- });
|
|
|
- items.sort_by(|it, next| it.message.cmp(&next.message));
|
|
|
- Ok(items)
|
|
|
- }
|
|
|
- Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
|
+ items.push(Fortune {
|
|
|
+ id: row.get(0),
|
|
|
+ message: Cow::Owned(row.get(1)),
|
|
|
+ });
|
|
|
}
|
|
|
+
|
|
|
+ items.sort_by(|it, next| it.message.cmp(&next.message));
|
|
|
+ Ok(items)
|
|
|
}
|
|
|
}
|