Browse Source

[Rust] viz optimize (#7759)

* chore: [Rust] viz optimize

* fix: range

* fix: remove smallvec

* chore(viz): 0.4.4
Fangdun Tsai 2 years ago
parent
commit
6a898f9e84

+ 8 - 8
frameworks/Rust/viz/Cargo.toml

@@ -19,18 +19,18 @@ path = "src/main_sqlx.rs"
 required-features = ["sqlx", "markup", "v_htmlescape"]
 
 [dependencies]
-viz = "0.4.3"
+viz = "0.4.4"
 hyper = "0.14"
 atoi = "2.0.0"
+once_cell = "1"
 serde = { version = "1.0", features = ["derive"] }
-tokio = { version = "1.21", features = ["macros", "rt-multi-thread"] }
+tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
 nanorand = "0.7"
 thiserror = "1.0"
-futures-util = "0.3.24"
-stretto = { version = "0.7", features = ["async"] }
+futures-util = "0.3"
 
-tokio-postgres = { version = "0.7.7", optional = true }
-sqlx = { version = "0.6.2", features = [
+tokio-postgres = { version = "0.7", optional = true }
+sqlx = { version = "0.6", features = [
   "postgres",
   "macros",
   "runtime-tokio-native-tls",
@@ -38,8 +38,8 @@ sqlx = { version = "0.6.2", features = [
 # diesel = { version = "1.4.8", features = ["postgres"], optional = true }
 
 yarte = { version = "0.15", features = ["bytes-buf", "json"], optional = true }
-markup = { version = "0.13.1", optional = true }
-v_htmlescape = { version = "0.15.7", optional = true }
+markup = { version = "0.13", optional = true }
+v_htmlescape = { version = "0.15", optional = true }
 
 [profile.release]
 lto = true

+ 21 - 22
frameworks/Rust/viz/src/db_pg.rs

@@ -1,5 +1,4 @@
-use std::fmt::Write;
-use std::io;
+use std::{borrow::Cow, fmt::Write, io};
 
 use futures_util::{stream::FuturesUnordered, TryFutureExt, TryStreamExt};
 use nanorand::{Rng, WyRand};
@@ -7,6 +6,7 @@ use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
 use viz::{Error, IntoResponse, Response, StatusCode};
 
 use crate::models::{Fortune, World};
+use crate::utils::RANGE;
 
 /// Postgres Error
 #[derive(Debug, thiserror::Error)]
@@ -41,7 +41,7 @@ pub struct PgConnection {
 }
 
 impl PgConnection {
-    pub async fn connect(db_url: &str) -> PgConnection {
+    pub async fn connect(db_url: &str) -> Self {
         let (client, conn) = connect(db_url, NoTls)
             .await
             .expect("can not connect to postgresql");
@@ -108,7 +108,7 @@ impl PgConnection {
     }
 
     pub async fn get_world(&self) -> Result<World, PgError> {
-        let random_id = (self.rng.clone().generate::<u32>() % 10_000 + 1) as i32;
+        let random_id = self.rng.clone().generate_range(RANGE);
         self.query_one_world(random_id).await
     }
 
@@ -116,7 +116,7 @@ impl PgConnection {
         let mut rng = self.rng.clone();
         (0..num)
             .map(|_| {
-                let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+                let id = rng.generate_range(RANGE);
                 self.query_one_world(id)
             })
             .collect::<FuturesUnordered<_>>()
@@ -144,8 +144,8 @@ impl PgConnection {
 
         let worlds: Vec<World> = (0..num)
             .map(|_| {
-                let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-                let rid = (rng.generate::<u32>() % 10_000 + 1) as i32;
+                let id = rng.generate_range(RANGE);
+                let rid = rng.generate_range(RANGE);
                 self.query_one_world(id).map_ok(move |mut world| {
                     world.randomnumber = rid;
                     world
@@ -155,7 +155,8 @@ impl PgConnection {
             .try_collect()
             .await?;
 
-        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num as usize * 3);
+        let num = num as usize;
+        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num * 3);
 
         for w in &worlds {
             params.push(&w.id);
@@ -166,7 +167,7 @@ impl PgConnection {
             params.push(&w.id);
         }
 
-        let st = self.updates[(num as usize) - 1].clone();
+        let st = self.updates[num - 1].clone();
 
         self.client.query(&st, &params[..]).await?;
 
@@ -174,24 +175,22 @@ impl PgConnection {
     }
 
     pub async fn tell_fortune(&self) -> Result<Vec<Fortune>, PgError> {
-        let mut items = Vec::with_capacity(32);
+        let mut items = self
+            .client
+            .query(&self.fortune, &[])
+            .await?
+            .iter()
+            .map(|row| Fortune {
+                id: row.get(0),
+                message: Cow::Owned(row.get(1)),
+            })
+            .collect::<Vec<_>>();
 
         items.push(Fortune {
             id: 0,
-            message: "Additional fortune added at request time.".to_string(),
+            message: Cow::Borrowed("Additional fortune added at request time."),
         });
 
-        self.client
-            .query(&self.fortune, &[])
-            .await?
-            .iter()
-            .for_each(|row| {
-                items.push(Fortune {
-                    id: row.get(0),
-                    message: row.get(1),
-                })
-            });
-
         items.sort_by(|it, next| it.message.cmp(&next.message));
 
         Ok(items)

+ 1 - 0
frameworks/Rust/viz/src/main.rs

@@ -32,6 +32,7 @@ async fn main() -> Result<()> {
         .get("/json", json);
 
     server::builder()
+        .http1_pipeline_flush(true)
         .serve(ServiceMaker::from(app))
         .await
         .map_err(Error::normal)

+ 52 - 32
frameworks/Rust/viz/src/main_pg.rs

@@ -1,11 +1,14 @@
-use std::convert::identity;
-use std::sync::Arc;
+use std::{
+    convert::identity,
+    sync::Arc,
+    thread::{available_parallelism, spawn},
+};
 
 use nanorand::{Rng, WyRand};
-use stretto::AsyncCache;
+use once_cell::sync::OnceCell;
 use viz::{
-    header::SERVER, types::State, Error, HandlerExt, Request, RequestExt, Response,
-    ResponseExt, Result, Router, ServiceMaker,
+    header::SERVER, types::State, Error, Request, RequestExt, Response, ResponseExt,
+    Result, Router, ServiceMaker,
 };
 use yarte::ywrite_html;
 
@@ -15,6 +18,11 @@ mod server;
 mod utils;
 
 use db_pg::{PgConnection, PgError};
+use utils::{HDR_SERVER, RANGE};
+
+const DB_URL: &str =
+    "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+static CACHED: OnceCell<Vec<models::World>> = OnceCell::new();
 
 async fn db(req: Request) -> Result<Response> {
     let db = req.state::<Arc<PgConnection>>().ok_or(PgError::Connect)?;
@@ -22,7 +30,7 @@ async fn db(req: Request) -> Result<Response> {
     let world = db.get_world().await?;
 
     let mut res = Response::json(world)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
@@ -35,7 +43,7 @@ async fn fortunes(req: Request) -> Result<Response> {
     ywrite_html!(buf, "{{> fortune }}");
 
     let mut res = Response::html(buf);
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
@@ -46,28 +54,24 @@ async fn queries(req: Request) -> Result<Response> {
     let worlds = db.get_worlds(count).await?;
 
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
 async fn cached_queries(req: Request) -> Result<Response> {
-    let cahced = req
-        .state::<AsyncCache<i32, models::World>>()
-        .ok_or(PgError::Connect)?;
-
     let count = utils::get_query_param(req.query_string());
     let mut rng = WyRand::new();
 
     let worlds = (0..count)
         .map(|_| {
-            let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            cahced.get(&id).map(|v| v.read())
+            let id = rng.generate_range(RANGE) as usize;
+            CACHED.get()?.get(id)
         })
         .filter_map(identity)
         .collect::<Vec<_>>();
 
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
@@ -78,37 +82,53 @@ async fn updates(req: Request) -> Result<Response> {
     let worlds = db.update(count).await?;
 
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
-#[tokio::main]
-async fn main() -> Result<()> {
-    const DB_URL: &str =
-        "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+async fn populate_cache() -> Result<(), Error> {
+    let conn = PgConnection::connect(DB_URL).await;
+    let worlds = conn.get_worlds_by_limit(10_000).await?;
+    CACHED.set(worlds).unwrap();
+    Ok(())
+}
 
-    let pg_conn = Arc::new(PgConnection::connect(DB_URL).await);
+fn main() {
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+
+    rt.block_on(async {
+        populate_cache().await.expect("cache insert failed");
+    });
+
+    for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
+        spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve());
+        });
+    }
 
-    let cached = AsyncCache::new(10_000, 1e6 as i64, tokio::spawn).unwrap();
+    rt.block_on(serve());
+}
 
-    {
-        let worlds = pg_conn.get_worlds_by_limit(10_000).await?;
-        for w in worlds {
-            cached.insert(w.id, w, 1).await;
-        }
-        cached.wait().await.expect("cache insert failed");
-    }
+async fn serve() {
+    let conn = PgConnection::connect(DB_URL).await;
 
     let app = Router::new()
         .get("/db", db)
         .get("/fortunes", fortunes)
         .get("/queries", queries)
         .get("/updates", updates)
-        .with(State::new(pg_conn))
-        .get("/cached_queries", cached_queries.with(State::new(cached)));
+        .with(State::new(Arc::new(conn)))
+        .get("/cached_queries", cached_queries);
 
     server::builder()
         .serve(ServiceMaker::from(app))
         .await
-        .map_err(Error::normal)
+        .unwrap()
 }

+ 82 - 89
frameworks/Rust/viz/src/main_sqlx.rs

@@ -1,10 +1,10 @@
-use std::convert::identity;
+use std::{borrow::Cow, convert::identity};
 
 use nanorand::{Rng, WyRand};
-use sqlx::Arguments;
-use stretto::AsyncCache;
+use once_cell::sync::OnceCell;
+use sqlx::{postgres::PgRow, Arguments, Pool, Row};
 use viz::{
-    get, header::SERVER, types::State, BytesMut, Error, IntoHandler, Response,
+    header::SERVER, types::State, BytesMut, Error, Request, RequestExt, Response,
     ResponseExt, Result, Router, ServiceMaker,
 };
 
@@ -13,37 +13,49 @@ mod models_sqlx;
 mod server;
 mod utils;
 
-use db_sqlx::{Counter, DatabaseConnection, PgArguments, PgError, PgPoolOptions};
+use db_sqlx::{
+    Counter, DatabaseConnection, PgArguments, PgError, PgPoolOptions, Postgres,
+};
 use models_sqlx::{Fortune, World};
+use utils::{HDR_SERVER, RANGE};
+
+const DB_URL: &str =
+    "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+static CACHED: OnceCell<Vec<World>> = OnceCell::new();
+
+async fn db(mut req: Request) -> Result<Response> {
+    let (State(mut rng), DatabaseConnection(mut conn)) =
+        req.extract::<(State<WyRand>, DatabaseConnection)>().await?;
 
-async fn db(
-    State(mut rng): State<WyRand>,
-    DatabaseConnection(mut conn): DatabaseConnection,
-) -> Result<Response> {
-    let random_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-    let mut args = PgArguments::default();
-    args.add(random_id);
+    let random_id = rng.generate_range(RANGE);
 
     let world: World =
-        sqlx::query_as_with("SELECT id, randomnumber FROM World WHERE id = $1", args)
+        sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
+            .bind(random_id)
             .fetch_one(&mut conn)
             .await
             .map_err(PgError)?;
 
     let mut res = Response::json(world)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
-async fn fortunes(DatabaseConnection(mut conn): DatabaseConnection) -> Result<Response> {
-    let mut items: Vec<Fortune> = sqlx::query_as("SELECT * FROM Fortune")
+async fn fortunes(mut req: Request) -> Result<Response> {
+    let DatabaseConnection(mut conn) = req.extract::<DatabaseConnection>().await?;
+
+    let mut items: Vec<Fortune> = sqlx::query("SELECT * FROM Fortune")
+        .map(|row: PgRow| Fortune {
+            id: row.get(0),
+            message: Cow::Owned(row.get(1)),
+        })
         .fetch_all(&mut conn)
         .await
         .map_err(PgError)?;
 
     items.push(Fortune {
         id: 0,
-        message: "Additional fortune added at request time.".to_string(),
+        message: Cow::Borrowed("Additional fortune added at request time."),
     });
 
     items.sort_by(|it, next| it.message.cmp(&next.message));
@@ -52,83 +64,74 @@ async fn fortunes(DatabaseConnection(mut conn): DatabaseConnection) -> Result<Re
     buf.extend(FortunesTemplate { items }.to_string().as_bytes());
 
     let mut res = Response::html(buf.freeze());
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
-async fn queries(
-    Counter(count): Counter,
-    State(mut rng): State<WyRand>,
-    DatabaseConnection(mut conn): DatabaseConnection,
-) -> Result<Response> {
+async fn queries(mut req: Request) -> Result<Response> {
+    let (Counter(count), State(mut rng), DatabaseConnection(mut conn)) = req
+        .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
+        .await?;
+
     let mut worlds = Vec::with_capacity(count as usize);
 
     for _ in 0..count {
-        let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-
-        let mut args = PgArguments::default();
-        args.add(id);
+        let id = rng.generate_range(RANGE);
 
-        let world = sqlx::query_as_with::<_, World, _>(
-            "SELECT id, randomnumber FROM World WHERE id = $1",
-            args,
-        )
-        .fetch_one(&mut conn)
-        .await
-        .map_err(PgError)?;
+        let world: World =
+            sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
+                .bind(id)
+                .fetch_one(&mut conn)
+                .await
+                .map_err(PgError)?;
 
         worlds.push(world);
     }
 
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
-async fn cached_queries(
-    Counter(count): Counter,
-    State(mut rng): State<WyRand>,
-    State(cached): State<AsyncCache<i32, World>>,
-) -> Result<Response> {
+async fn cached_queries(mut req: Request) -> Result<Response> {
+    let (Counter(count), State(mut rng)) =
+        req.extract::<(Counter, State<WyRand>)>().await?;
+
     let worlds = (0..count)
         .map(|_| {
-            let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            cached.get(&id).map(|v| v.read())
+            let id = rng.generate_range(RANGE) as usize;
+            CACHED.get()?.get(id)
         })
         .filter_map(identity)
         .collect::<Vec<_>>();
 
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut().insert(SERVER, HDR_SERVER);
     Ok(res)
 }
 
-async fn updates(
-    Counter(count): Counter,
-    State(mut rng): State<WyRand>,
-    DatabaseConnection(mut conn): DatabaseConnection,
-) -> Result<Response> {
+async fn updates(mut req: Request) -> Result<Response> {
+    let (Counter(count), State(mut rng), DatabaseConnection(mut conn)) = req
+        .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
+        .await?;
+
     let mut worlds = Vec::with_capacity(count as usize);
 
     for _ in 0..count {
-        let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-
-        let mut args = PgArguments::default();
-        args.add(id);
+        let id = rng.generate_range(RANGE);
 
-        let world = sqlx::query_as_with::<_, World, _>(
-            "SELECT id, randomnumber FROM World WHERE id = $1",
-            args,
-        )
-        .fetch_one(&mut conn)
-        .await
-        .map_err(PgError)?;
+        let world: World =
+            sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
+                .bind(id)
+                .fetch_one(&mut conn)
+                .await
+                .map_err(PgError)?;
 
         worlds.push(world);
     }
 
     for w in &mut worlds {
-        let randomnumber = (rng.generate::<u32>() % 10_000 + 1) as i32;
+        let randomnumber = rng.generate_range(RANGE);
         let mut args = PgArguments::default();
         args.add(randomnumber);
         args.add(w.id);
@@ -145,11 +148,20 @@ async fn updates(
     Ok(res)
 }
 
+async fn populate_cache(pool: Pool<Postgres>) -> Result<(), Error> {
+    let mut conn = pool.acquire().await.map_err(Error::normal)?;
+
+    let worlds: Vec<World> = sqlx::query_as("SELECT * FROM World LIMIT $1")
+        .bind(10_000)
+        .fetch_all(&mut conn)
+        .await
+        .map_err(PgError)?;
+    CACHED.set(worlds).unwrap();
+    Ok(())
+}
+
 #[tokio::main]
 async fn main() -> Result<()> {
-    const DB_URL: &str =
-        "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
-
     let pool = PgPoolOptions::new()
         .max_connections(56)
         .min_connections(56)
@@ -157,36 +169,17 @@ async fn main() -> Result<()> {
         .await
         .map_err(PgError)?;
 
-    let rng = WyRand::new();
-
-    let cached = AsyncCache::new(10_000, 1e6 as i64, tokio::spawn).unwrap();
-
-    {
-        let mut conn = pool.acquire().await.map_err(PgError)?;
-        let mut args = PgArguments::default();
-        args.add(10_000);
-        let worlds: Vec<World> =
-            sqlx::query_as_with("SELECT id, randomnumber FROM World LIMIT $1", args)
-                .fetch_all(&mut conn)
-                .await
-                .map_err(PgError)?;
+    populate_cache(pool.clone()).await?;
 
-        for w in worlds {
-            cached.insert(w.id, w, 1).await;
-        }
-        cached.wait().await.expect("cache insert failed");
-    }
+    let rng = WyRand::new();
 
     let app = Router::new()
-        .route("/db", get(db.into_handler()))
-        .route("/fortunes", get(fortunes.into_handler()))
-        .route("/queries", get(queries.into_handler()))
-        .route("/updates", get(updates.into_handler()))
+        .get("/db", db)
+        .get("/fortunes", fortunes)
+        .get("/queries", queries)
+        .get("/updates", updates)
         .with(State::new(pool))
-        .route(
-            "/cached_queries",
-            get(cached_queries.into_handler()).with(State::new(cached)),
-        )
+        .get("/cached_queries", cached_queries)
         .with(State::new(rng));
 
     server::builder()

+ 3 - 1
frameworks/Rust/viz/src/models.rs

@@ -1,3 +1,5 @@
+use std::borrow::Cow;
+
 use serde::Serialize;
 
 #[derive(Clone, Copy, Serialize, Debug, yarte::Serialize)]
@@ -9,5 +11,5 @@ pub struct World {
 #[derive(Serialize, Debug)]
 pub struct Fortune {
     pub id: i32,
-    pub message: String,
+    pub message: Cow<'static, str>,
 }

+ 3 - 1
frameworks/Rust/viz/src/models_sqlx.rs

@@ -1,3 +1,5 @@
+use std::borrow::Cow;
+
 use serde::{Deserialize, Serialize};
 use sqlx::FromRow;
 
@@ -10,5 +12,5 @@ pub struct World {
 #[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
 pub struct Fortune {
     pub id: i32,
-    pub message: String,
+    pub message: Cow<'static, str>,
 }

+ 5 - 2
frameworks/Rust/viz/src/utils.rs

@@ -1,8 +1,11 @@
-use std::cmp;
+use std::{ops::Range};
 
 use atoi::FromRadix10;
 use viz::header::HeaderValue;
 
+#[allow(dead_code)]
+pub const RANGE: Range<i32> = 1..10_001;
+
 pub const HDR_SERVER: HeaderValue = HeaderValue::from_static("VIZ");
 
 #[allow(dead_code)]
@@ -13,5 +16,5 @@ pub fn get_query_param(query: Option<&str>) -> u16 {
     } else {
         1
     };
-    cmp::min(500, cmp::max(1, q))
+    q.clamp(1, 500)
 }

+ 1 - 1
frameworks/Rust/viz/viz-pg.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.64.0
+FROM rust:1.65.0
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 

+ 1 - 1
frameworks/Rust/viz/viz-sqlx.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.64.0
+FROM rust:1.65.0
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 

+ 1 - 1
frameworks/Rust/viz/viz.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.64.0
+FROM rust:1.65.0
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++