Browse Source

update viz (#7927)

* update viz

* fix: runtime

* chore: tweak pg update

* chore: viz 0.4.8
Fangdun Tsai 2 years ago
parent
commit
e33304ad56

+ 3 - 3
frameworks/Rust/viz/Cargo.toml

@@ -24,10 +24,10 @@ path = "src/main_diesel.rs"
 required-features = ["diesel", "diesel-async", "sailfish"]
 required-features = ["diesel", "diesel-async", "sailfish"]
 
 
 [dependencies]
 [dependencies]
-viz = "0.4.7"
+viz = "0.4.8"
 hyper = "0.14.24"
 hyper = "0.14.24"
 atoi = "2.0"
 atoi = "2.0"
-once_cell = "1.17.0"
+once_cell = "1.17.1"
 serde = { version = "1.0", features = ["derive"] }
 serde = { version = "1.0", features = ["derive"] }
 nanorand = "0.7"
 nanorand = "0.7"
 thiserror = "1.0"
 thiserror = "1.0"
@@ -45,7 +45,7 @@ diesel = { version = "2.0.3", default-features = false, features = [
 ], optional = true }
 ], optional = true }
 diesel-async = { version = "0.2.0", default-features = false, features = [
 diesel-async = { version = "0.2.0", default-features = false, features = [
   "postgres",
   "postgres",
-  "deadpool",
+  "bb8",
 ], optional = true }
 ], optional = true }
 # diesel = { version = "1.4.8", features = ["postgres"], optional = true }
 # diesel = { version = "1.4.8", features = ["postgres"], optional = true }
 
 

+ 12 - 21
frameworks/Rust/viz/src/db_diesel.rs

@@ -2,7 +2,7 @@ use std::borrow::Cow;
 
 
 use diesel::prelude::*;
 use diesel::prelude::*;
 use diesel_async::{
 use diesel_async::{
-    pooled_connection::deadpool::{Object, Pool, PoolError},
+    pooled_connection::bb8::{Pool, RunError},
     AsyncPgConnection, RunQueryDsl,
     AsyncPgConnection, RunQueryDsl,
 };
 };
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
@@ -21,7 +21,7 @@ pub enum PgError {
     #[error(transparent)]
     #[error(transparent)]
     DieselError(#[from] diesel::result::Error),
     DieselError(#[from] diesel::result::Error),
     #[error(transparent)]
     #[error(transparent)]
-    PoolError(#[from] PoolError),
+    PoolError(#[from] RunError),
     #[error(transparent)]
     #[error(transparent)]
     RenderError(#[from] RenderError),
     RenderError(#[from] RenderError),
 }
 }
@@ -38,18 +38,11 @@ impl IntoResponse for PgError {
     }
     }
 }
 }
 
 
-pub async fn connect(
-    pool: Option<Pool<AsyncPgConnection>>,
-) -> Result<Object<AsyncPgConnection>, PgError> {
-    let obj = pool.ok_or(PgError::Missing)?.get().await?;
-    Ok(obj)
-}
-
 pub async fn get_worlds_by_limit(
 pub async fn get_worlds_by_limit(
-    pool: Option<Pool<AsyncPgConnection>>,
+    pool: Pool<AsyncPgConnection>,
     limit: i64,
     limit: i64,
 ) -> Result<Vec<World>, PgError> {
 ) -> Result<Vec<World>, PgError> {
-    let mut conn = connect(pool).await?;
+    let mut conn = pool.get().await?;
     let worlds = world::table
     let worlds = world::table
         .limit(limit)
         .limit(limit)
         .get_results::<World>(&mut conn)
         .get_results::<World>(&mut conn)
@@ -63,19 +56,19 @@ async fn _get_world(conn: &mut AsyncPgConnection, id: i32) -> Result<World, PgEr
 }
 }
 
 
 pub async fn get_world(
 pub async fn get_world(
-    pool: Option<Pool<AsyncPgConnection>>,
+    pool: Pool<AsyncPgConnection>,
     id: i32,
     id: i32,
 ) -> Result<World, PgError> {
 ) -> Result<World, PgError> {
-    let mut conn = connect(pool).await?;
+    let mut conn = pool.get().await?;
     _get_world(&mut conn, id).await
     _get_world(&mut conn, id).await
 }
 }
 
 
 pub async fn get_worlds(
 pub async fn get_worlds(
-    pool: Option<Pool<AsyncPgConnection>>,
+    pool: Pool<AsyncPgConnection>,
     mut rng: WyRand,
     mut rng: WyRand,
     count: u16,
     count: u16,
 ) -> Result<Vec<World>, PgError> {
 ) -> Result<Vec<World>, PgError> {
-    let mut conn = connect(pool).await?;
+    let mut conn = pool.get().await?;
 
 
     let mut worlds = Vec::<World>::with_capacity(count as usize);
     let mut worlds = Vec::<World>::with_capacity(count as usize);
 
 
@@ -89,11 +82,11 @@ pub async fn get_worlds(
 }
 }
 
 
 pub async fn update_worlds(
 pub async fn update_worlds(
-    pool: Option<Pool<AsyncPgConnection>>,
+    pool: Pool<AsyncPgConnection>,
     mut rng: WyRand,
     mut rng: WyRand,
     count: u16,
     count: u16,
 ) -> Result<Vec<World>, PgError> {
 ) -> Result<Vec<World>, PgError> {
-    let mut conn = connect(pool).await?;
+    let mut conn = pool.get().await?;
 
 
     let mut worlds = Vec::<World>::with_capacity(count as usize);
     let mut worlds = Vec::<World>::with_capacity(count as usize);
 
 
@@ -124,10 +117,8 @@ pub async fn update_worlds(
         .await
         .await
 }
 }
 
 
-pub async fn tell_fortune(
-    pool: Option<Pool<AsyncPgConnection>>,
-) -> Result<String, PgError> {
-    let mut conn = connect(pool).await?;
+pub async fn tell_fortune(pool: Pool<AsyncPgConnection>) -> Result<String, PgError> {
+    let mut conn = pool.get().await?;
 
 
     let mut items = fortune::table.load::<Fortune>(&mut conn).await?;
     let mut items = fortune::table.load::<Fortune>(&mut conn).await?;
 
 

+ 5 - 8
frameworks/Rust/viz/src/db_pg.rs

@@ -1,6 +1,6 @@
 use std::{borrow::Cow, fmt::Write, io, sync::Arc};
 use std::{borrow::Cow, fmt::Write, io, sync::Arc};
 
 
-use futures_util::{stream::FuturesUnordered, TryStreamExt};
+use futures_util::{stream::FuturesUnordered, TryFutureExt, TryStreamExt};
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
 use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
 use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
 use viz::{Error, IntoResponse, Response, StatusCode};
 use viz::{Error, IntoResponse, Response, StatusCode};
@@ -144,11 +144,10 @@ impl PgConnection {
             let id = rng.generate_range(RANGE);
             let id = rng.generate_range(RANGE);
             let rid = rng.generate_range(RANGE);
             let rid = rng.generate_range(RANGE);
 
 
-            worlds.push(async move {
-                let mut world = self.query_one_world(id).await?;
+            worlds.push(self.query_one_world(id).map_ok(move |mut world| {
                 world.randomnumber = rid;
                 world.randomnumber = rid;
-                Ok::<World, PgError>(world)
-            });
+                world
+            }));
         }
         }
 
 
         let worlds = worlds.try_collect::<Vec<World>>().await?;
         let worlds = worlds.try_collect::<Vec<World>>().await?;
@@ -165,9 +164,7 @@ impl PgConnection {
             params.push(&w.id);
             params.push(&w.id);
         }
         }
 
 
-        self.client
-            .query(&self.updates[num - 1], &params[..])
-            .await?;
+        self.client.query(&self.updates[num - 1], &params).await?;
 
 
         Ok(worlds)
         Ok(worlds)
     }
     }

+ 18 - 38
frameworks/Rust/viz/src/main_diesel.rs

@@ -1,13 +1,10 @@
 #[macro_use]
 #[macro_use]
 extern crate diesel;
 extern crate diesel;
 
 
-use std::{
-    convert::identity,
-    thread::{available_parallelism, spawn},
-};
+use std::{convert::identity, thread::available_parallelism};
 
 
 use diesel_async::{
 use diesel_async::{
-    pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
+    pooled_connection::{bb8::Pool, AsyncDieselConnectionManager},
     AsyncPgConnection,
     AsyncPgConnection,
 };
 };
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
@@ -34,7 +31,7 @@ static CACHED: OnceCell<Vec<World>> = OnceCell::new();
 
 
 async fn db(req: Request) -> Result<Response> {
 async fn db(req: Request) -> Result<Response> {
     let mut rng = req.state::<WyRand>().unwrap();
     let mut rng = req.state::<WyRand>().unwrap();
-    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let pool = req.state::<Pool<AsyncPgConnection>>().unwrap();
 
 
     let random_id = rng.generate_range(RANGE);
     let random_id = rng.generate_range(RANGE);
 
 
@@ -47,7 +44,7 @@ async fn db(req: Request) -> Result<Response> {
 }
 }
 
 
 async fn fortunes(req: Request) -> Result<Response> {
 async fn fortunes(req: Request) -> Result<Response> {
-    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let pool = req.state::<Pool<AsyncPgConnection>>().unwrap();
 
 
     let fortunes = tell_fortune(pool).await?;
     let fortunes = tell_fortune(pool).await?;
 
 
@@ -59,7 +56,7 @@ async fn fortunes(req: Request) -> Result<Response> {
 
 
 async fn queries(req: Request) -> Result<Response> {
 async fn queries(req: Request) -> Result<Response> {
     let rng = req.state::<WyRand>().unwrap();
     let rng = req.state::<WyRand>().unwrap();
-    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let pool = req.state::<Pool<AsyncPgConnection>>().unwrap();
     let count = utils::get_query_param(req.query_string());
     let count = utils::get_query_param(req.query_string());
 
 
     let worlds = get_worlds(pool, rng, count).await?;
     let worlds = get_worlds(pool, rng, count).await?;
@@ -90,7 +87,7 @@ async fn cached_queries(req: Request) -> Result<Response> {
 
 
 async fn updates(req: Request) -> Result<Response> {
 async fn updates(req: Request) -> Result<Response> {
     let rng = req.state::<WyRand>().unwrap();
     let rng = req.state::<WyRand>().unwrap();
-    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let pool = req.state::<Pool<AsyncPgConnection>>().unwrap();
     let count = utils::get_query_param(req.query_string());
     let count = utils::get_query_param(req.query_string());
 
 
     let worlds = update_worlds(pool, rng, count).await?;
     let worlds = update_worlds(pool, rng, count).await?;
@@ -101,30 +98,24 @@ async fn updates(req: Request) -> Result<Response> {
     Ok(res)
     Ok(res)
 }
 }
 
 
-async fn populate_cache(pool: Option<Pool<AsyncPgConnection>>) -> Result<()> {
+async fn populate_cache(pool: Pool<AsyncPgConnection>) -> Result<()> {
     let worlds = get_worlds_by_limit(pool, 10_000).await?;
     let worlds = get_worlds_by_limit(pool, 10_000).await?;
     CACHED.set(worlds).unwrap();
     CACHED.set(worlds).unwrap();
     Ok(())
     Ok(())
 }
 }
 
 
-fn main() {
-    let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as usize;
-
-    let pool =
-        Pool::<AsyncPgConnection>::builder(AsyncDieselConnectionManager::new(DB_URL))
-            .max_size(max)
-            .wait_timeout(None)
-            .create_timeout(None)
-            .recycle_timeout(None)
-            .build()
-            .unwrap();
+#[tokio::main]
+async fn main() {
+    let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as u32;
 
 
-    let rt = tokio::runtime::Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap();
+    let pool = Pool::<AsyncPgConnection>::builder()
+        .max_size(max)
+        .min_idle(Some(max))
+        .idle_timeout(None)
+        .build_unchecked(AsyncDieselConnectionManager::new(DB_URL));
 
 
-    rt.block_on(populate_cache(Some(pool.clone())))
+    populate_cache(pool.clone())
+        .await
         .expect("cache insert failed");
         .expect("cache insert failed");
 
 
     let rng = WyRand::new();
     let rng = WyRand::new();
@@ -140,18 +131,7 @@ fn main() {
             .with(State::new(rng)),
             .with(State::new(rng)),
     );
     );
 
 
-    for _ in 1..max {
-        let service = service.clone();
-        spawn(move || {
-            let rt = tokio::runtime::Builder::new_current_thread()
-                .enable_all()
-                .build()
-                .unwrap();
-            rt.block_on(serve(service));
-        });
-    }
-
-    rt.block_on(serve(service));
+    serve(service).await;
 }
 }
 
 
 async fn serve(service: ServiceMaker) {
 async fn serve(service: ServiceMaker) {