|
@@ -1,10 +1,10 @@
|
|
|
use std::fmt::Write;
|
|
|
use std::io;
|
|
|
|
|
|
-use actix::fut;
|
|
|
use actix::prelude::*;
|
|
|
use bytes::{Bytes, BytesMut};
|
|
|
-use futures::{stream, Future, Stream};
|
|
|
+use futures::stream::futures_unordered::FuturesUnordered;
|
|
|
+use futures::{FutureExt, StreamExt, TryStreamExt};
|
|
|
use rand::{thread_rng, Rng, ThreadRng};
|
|
|
use tokio_postgres::{connect, Client, NoTls, Statement};
|
|
|
|
|
@@ -13,9 +13,9 @@ use crate::utils::{Fortune, Writer};
|
|
|
|
|
|
/// Postgres interface
|
|
|
pub struct PgConnection {
|
|
|
- cl: Option<Client>,
|
|
|
- fortune: Option<Statement>,
|
|
|
- world: Option<Statement>,
|
|
|
+ cl: Client,
|
|
|
+ fortune: Statement,
|
|
|
+ world: Statement,
|
|
|
rng: ThreadRng,
|
|
|
}
|
|
|
|
|
@@ -24,47 +24,24 @@ impl Actor for PgConnection {
|
|
|
}
|
|
|
|
|
|
impl PgConnection {
|
|
|
- pub fn connect(db_url: &str) -> Addr<PgConnection> {
|
|
|
- let hs = connect(db_url, NoTls);
|
|
|
-
|
|
|
- PgConnection::create(move |ctx| {
|
|
|
- let act = PgConnection {
|
|
|
- cl: None,
|
|
|
- fortune: None,
|
|
|
- world: None,
|
|
|
- rng: thread_rng(),
|
|
|
- };
|
|
|
-
|
|
|
- hs.map_err(|_| panic!("can not connect to postgresql"))
|
|
|
- .into_actor(&act)
|
|
|
- .and_then(|(mut cl, conn), act, ctx| {
|
|
|
- ctx.wait(
|
|
|
- cl.prepare("SELECT id, message FROM fortune")
|
|
|
- .map_err(|_| ())
|
|
|
- .into_actor(act)
|
|
|
- .and_then(|st, act, _| {
|
|
|
- act.fortune = Some(st);
|
|
|
- fut::ok(())
|
|
|
- }),
|
|
|
- );
|
|
|
- ctx.wait(
|
|
|
- cl.prepare("SELECT id, randomnumber FROM world WHERE id=$1")
|
|
|
- .map_err(|_| ())
|
|
|
- .into_actor(act)
|
|
|
- .and_then(|st, act, _| {
|
|
|
- act.world = Some(st);
|
|
|
- fut::ok(())
|
|
|
- }),
|
|
|
- );
|
|
|
-
|
|
|
- act.cl = Some(cl);
|
|
|
- Arbiter::spawn(conn.map_err(|e| panic!("{}", e)));
|
|
|
- fut::ok(())
|
|
|
- })
|
|
|
- .wait(ctx);
|
|
|
-
|
|
|
- act
|
|
|
- })
|
|
|
+ pub async fn connect(db_url: &str) -> Result<Addr<PgConnection>, io::Error> {
|
|
|
+ let (cl, conn) = connect(db_url, NoTls)
|
|
|
+ .await
|
|
|
+ .expect("can not connect to postgresql");
|
|
|
+ actix_rt::spawn(conn.map(|res| panic!("{:?}", res)));
|
|
|
+
|
|
|
+ let fortune = cl.prepare("SELECT id, message FROM fortune").await.unwrap();
|
|
|
+ let world = cl
|
|
|
+ .prepare("SELECT id, randomnumber FROM world WHERE id=$1")
|
|
|
+ .await
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ Ok(PgConnection::create(move |_| PgConnection {
|
|
|
+ cl,
|
|
|
+ fortune,
|
|
|
+ world,
|
|
|
+ rng: thread_rng(),
|
|
|
+ }))
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -75,32 +52,29 @@ impl Message for RandomWorld {
|
|
|
}
|
|
|
|
|
|
impl Handler<RandomWorld> for PgConnection {
|
|
|
- type Result = ResponseFuture<Bytes, io::Error>;
|
|
|
+ type Result = ResponseFuture<Result<Bytes, io::Error>>;
|
|
|
|
|
|
fn handle(&mut self, _: RandomWorld, _: &mut Self::Context) -> Self::Result {
|
|
|
let random_id = self.rng.gen_range::<i32>(1, 10_001);
|
|
|
-
|
|
|
- Box::new(
|
|
|
- self.cl
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .query(self.world.as_ref().unwrap(), &[&random_id])
|
|
|
- .into_future()
|
|
|
- .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0)))
|
|
|
- .map(|(row, _)| {
|
|
|
- let row = row.unwrap();
|
|
|
- let mut body = BytesMut::with_capacity(33);
|
|
|
- serde_json::to_writer(
|
|
|
- Writer(&mut body),
|
|
|
- &World {
|
|
|
- id: row.get(0),
|
|
|
- randomnumber: row.get(1),
|
|
|
- },
|
|
|
- )
|
|
|
- .unwrap();
|
|
|
- body.freeze()
|
|
|
- }),
|
|
|
- )
|
|
|
+ let fut = self.cl.query_one(&self.world, &[&random_id]);
|
|
|
+
|
|
|
+ Box::pin(async move {
|
|
|
+ let row = fut
|
|
|
+ .await
|
|
|
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
|
|
|
+
|
|
|
+ let mut body = BytesMut::with_capacity(33);
|
|
|
+ serde_json::to_writer(
|
|
|
+ Writer(&mut body),
|
|
|
+ &World {
|
|
|
+ id: row.get(0),
|
|
|
+ randomnumber: row.get(1),
|
|
|
+ },
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ Ok(body.freeze())
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -111,32 +85,28 @@ impl Message for RandomWorlds {
|
|
|
}
|
|
|
|
|
|
impl Handler<RandomWorlds> for PgConnection {
|
|
|
- type Result = ResponseFuture<Vec<World>, io::Error>;
|
|
|
+ type Result = ResponseFuture<Result<Vec<World>, io::Error>>;
|
|
|
|
|
|
fn handle(&mut self, msg: RandomWorlds, _: &mut Self::Context) -> Self::Result {
|
|
|
- let mut worlds = Vec::with_capacity(msg.0 as usize);
|
|
|
+ let worlds = FuturesUnordered::new();
|
|
|
for _ in 0..msg.0 {
|
|
|
let w_id: i32 = self.rng.gen_range(1, 10_001);
|
|
|
worlds.push(
|
|
|
self.cl
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .query(self.world.as_ref().unwrap(), &[&w_id])
|
|
|
- .into_future()
|
|
|
- .map_err(|e| {
|
|
|
- io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
|
|
|
- })
|
|
|
- .map(|(row, _)| {
|
|
|
- let row = row.unwrap();
|
|
|
- World {
|
|
|
+ .query_one(&self.world, &[&w_id])
|
|
|
+ .map(|res| match res {
|
|
|
+ Err(e) => {
|
|
|
+ Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
|
|
|
+ }
|
|
|
+ Ok(row) => Ok(World {
|
|
|
id: row.get(0),
|
|
|
randomnumber: row.get(1),
|
|
|
- }
|
|
|
+ }),
|
|
|
}),
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- Box::new(stream::futures_unordered(worlds).collect())
|
|
|
+ Box::pin(worlds.try_collect())
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -147,62 +117,53 @@ impl Message for UpdateWorld {
|
|
|
}
|
|
|
|
|
|
impl Handler<UpdateWorld> for PgConnection {
|
|
|
- type Result = ResponseActFuture<Self, Vec<World>, io::Error>;
|
|
|
+ type Result = ResponseFuture<Result<Vec<World>, io::Error>>;
|
|
|
|
|
|
fn handle(&mut self, msg: UpdateWorld, _: &mut Self::Context) -> Self::Result {
|
|
|
- let mut worlds = Vec::with_capacity(msg.0 as usize);
|
|
|
+ let worlds = FuturesUnordered::new();
|
|
|
for _ in 0..msg.0 {
|
|
|
- let id = self.rng.gen_range(1, 10_001);
|
|
|
+ let id: i32 = self.rng.gen_range(1, 10_001);
|
|
|
let w_id: i32 = self.rng.gen_range(1, 10_001);
|
|
|
- worlds.push(
|
|
|
- self.cl
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .query(self.world.as_ref().unwrap(), &[&w_id])
|
|
|
- .into_future()
|
|
|
- .map_err(|e| {
|
|
|
- io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
|
|
|
- })
|
|
|
- .map(move |(row, _)| {
|
|
|
- let row = row.unwrap();
|
|
|
+ worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
|
|
|
+ move |res| match res {
|
|
|
+ Err(e) => {
|
|
|
+ Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
|
|
|
+ }
|
|
|
+ Ok(row) => {
|
|
|
let mut world = World {
|
|
|
id: row.get(0),
|
|
|
randomnumber: row.get(1),
|
|
|
};
|
|
|
world.randomnumber = id;
|
|
|
- world
|
|
|
- }),
|
|
|
- );
|
|
|
+ Ok(world)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
- Box::new(
|
|
|
- stream::futures_unordered(worlds)
|
|
|
- .collect()
|
|
|
- .into_actor(self)
|
|
|
- .and_then(move |mut worlds, act, _| {
|
|
|
- let mut update = String::with_capacity(120 + 6 * msg.0 as usize);
|
|
|
- update
|
|
|
- .push_str("UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ");
|
|
|
-
|
|
|
- for w in &worlds {
|
|
|
- let _ = write!(&mut update, "({}, {}),", w.id, w.randomnumber);
|
|
|
- }
|
|
|
- worlds.sort_by_key(|w| w.id);
|
|
|
-
|
|
|
- update.pop();
|
|
|
- update
|
|
|
- .push_str(" ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id");
|
|
|
-
|
|
|
- act.cl
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .simple_query(&update)
|
|
|
- .collect()
|
|
|
- .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
|
|
|
- .into_actor(act)
|
|
|
- .map(|_, _, _| worlds)
|
|
|
- }),
|
|
|
- )
|
|
|
+ let cl = self.cl.clone();
|
|
|
+ Box::pin(async move {
|
|
|
+ let worlds: Vec<World> = worlds.try_collect().await?;
|
|
|
+
|
|
|
+ let mut update = String::with_capacity(120 + 6 * msg.0 as usize);
|
|
|
+ update.push_str(
|
|
|
+ "UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ",
|
|
|
+ );
|
|
|
+
|
|
|
+ for w in &worlds {
|
|
|
+ let _ = write!(&mut update, "({}, {}),", w.id, w.randomnumber);
|
|
|
+ }
|
|
|
+ update.pop();
|
|
|
+ update.push_str(
|
|
|
+ " ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id",
|
|
|
+ );
|
|
|
+
|
|
|
+ cl.simple_query(&update)
|
|
|
+ .await
|
|
|
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
|
|
|
+
|
|
|
+ Ok(worlds)
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -213,32 +174,32 @@ impl Message for TellFortune {
|
|
|
}
|
|
|
|
|
|
impl Handler<TellFortune> for PgConnection {
|
|
|
- type Result = ResponseFuture<Vec<Fortune>, io::Error>;
|
|
|
+ type Result = ResponseFuture<Result<Vec<Fortune>, io::Error>>;
|
|
|
|
|
|
fn handle(&mut self, _: TellFortune, _: &mut Self::Context) -> Self::Result {
|
|
|
- let items = vec![Fortune {
|
|
|
+ let mut items = vec![Fortune {
|
|
|
id: 0,
|
|
|
message: "Additional fortune added at request time.".to_string(),
|
|
|
}];
|
|
|
-
|
|
|
- Box::new(
|
|
|
- self.cl
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .query(self.fortune.as_ref().unwrap(), &[])
|
|
|
- .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
|
|
|
- .fold(items, move |mut items, row| {
|
|
|
- items.push(Fortune {
|
|
|
- id: row.get(0),
|
|
|
- message: row.get(1),
|
|
|
- });
|
|
|
- Ok::<_, io::Error>(items)
|
|
|
- })
|
|
|
- .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
|
|
|
- .map(|mut items| {
|
|
|
- items.sort_by(|it, next| it.message.cmp(&next.message));
|
|
|
- items
|
|
|
- }),
|
|
|
- )
|
|
|
+ let fut = self.cl.query_raw(&self.fortune, &[]);
|
|
|
+
|
|
|
+ Box::pin(async move {
|
|
|
+ let mut stream = fut
|
|
|
+ .await
|
|
|
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
|
|
|
+
|
|
|
+ while let Some(row) = stream.next().await {
|
|
|
+ let row = row.map_err(|e| {
|
|
|
+ io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
|
|
|
+ })?;
|
|
|
+ items.push(Fortune {
|
|
|
+ id: row.get(0),
|
|
|
+ message: row.get(1),
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ items.sort_by(|it, next| it.message.cmp(&next.message));
|
|
|
+ Ok(items)
|
|
|
+ })
|
|
|
}
|
|
|
}
|