Browse Source

[Rust] viz improves (#7825)

* [Rust] viz improves

* feat(viz): add diesel case

* feat(viz): improve sqlx case

* chore(viz): improve db case

* chore(viz): improve db case

* chore(viz): improve pg case
Fangdun Tsai 2 years ago
parent
commit
381e0e9f6c

+ 16 - 4
frameworks/Rust/viz/Cargo.toml

@@ -18,28 +18,40 @@ name = "viz-sqlx"
 path = "src/main_sqlx.rs"
 path = "src/main_sqlx.rs"
 required-features = ["sqlx", "markup", "v_htmlescape"]
 required-features = ["sqlx", "markup", "v_htmlescape"]
 
 
+[[bin]]
+name = "viz-diesel"
+path = "src/main_diesel.rs"
+required-features = ["diesel", "diesel-async", "sailfish"]
+
 [dependencies]
 [dependencies]
 viz = "0.4.4"
 viz = "0.4.4"
 hyper = "0.14"
 hyper = "0.14"
-atoi = "2.0.0"
-once_cell = "1"
+atoi = "2.0"
+once_cell = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde = { version = "1.0", features = ["derive"] }
-tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
 nanorand = "0.7"
 nanorand = "0.7"
 thiserror = "1.0"
 thiserror = "1.0"
 futures-util = "0.3"
 futures-util = "0.3"
 
 
+tokio = { version = "1.0", features = ["full"] }
 tokio-postgres = { version = "0.7", optional = true }
 tokio-postgres = { version = "0.7", optional = true }
 sqlx = { version = "0.6", features = [
 sqlx = { version = "0.6", features = [
   "postgres",
   "postgres",
   "macros",
   "macros",
   "runtime-tokio-native-tls",
   "runtime-tokio-native-tls",
 ], optional = true }
 ], optional = true }
-# diesel = { version = "1.4.8", features = ["postgres"], optional = true }
+diesel = { version = "2.0", default-features = false, features = [
+  "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
+], optional = true }
+diesel-async = { version = "0.2.0", default-features = false, features = [
+  "postgres",
+  "deadpool",
+], optional = true }
 
 
 yarte = { version = "0.15", features = ["bytes-buf", "json"], optional = true }
 yarte = { version = "0.15", features = ["bytes-buf", "json"], optional = true }
 markup = { version = "0.13", optional = true }
 markup = { version = "0.13", optional = true }
 v_htmlescape = { version = "0.15", optional = true }
 v_htmlescape = { version = "0.15", optional = true }
+sailfish = { version = "0.5", optional = true }
 
 
 [profile.release]
 [profile.release]
 lto = true
 lto = true

+ 23 - 1
frameworks/Rust/viz/benchmark_config.json

@@ -64,7 +64,29 @@
         "display_name": "Viz [Postgresql - SQLx]",
         "display_name": "Viz [Postgresql - SQLx]",
         "notes": "",
         "notes": "",
         "versus": "None"
         "versus": "None"
+      },
+      "diesel": {
+        "db_url": "/db",
+        "fortune_url": "/fortunes",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "cached_query_url": "/cached_queries?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Fullstack",
+        "database": "postgres",
+        "framework": "Viz",
+        "language": "Rust",
+        "flavor": "None",
+        "orm": "Full",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Viz [Postgresql - Diesel]",
+        "notes": "",
+        "versus": "None"
       }
       }
     }
     }
   ]
   ]
-}
+}

+ 15 - 0
frameworks/Rust/viz/config.toml

@@ -43,3 +43,18 @@ orm = "Raw"
 platform = "None"
 platform = "None"
 webserver = "viz"
 webserver = "viz"
 versus = ""
 versus = ""
+
+[diesel]
+urls.db = "/db"
+urls.query = "/queries?q="
+urls.update = "/updates?q="
+urls.fortune = "/fortunes"
+approach = "Realistic"
+classification = "Fullstack"
+database = "Postgres"
+database_os = "Linux"
+os = "Linux"
+orm = "Full"
+platform = "None"
+webserver = "viz"
+versus = ""

+ 143 - 0
frameworks/Rust/viz/src/db_diesel.rs

@@ -0,0 +1,143 @@
+use std::borrow::Cow;
+
+use diesel::prelude::*;
+use diesel_async::{
+    pooled_connection::deadpool::{Object, Pool, PoolError},
+    AsyncPgConnection, RunQueryDsl,
+};
+use nanorand::{Rng, WyRand};
+use sailfish::{RenderError, TemplateOnce};
+use viz::{Error, IntoResponse, Response, StatusCode};
+
+use crate::models_diesel::*;
+use crate::schema::*;
+use crate::RANGE;
+
+/// Postgres Error
+#[derive(Debug, thiserror::Error)]
+pub enum PgError {
+    #[error("missing pool")]
+    Missing,
+    #[error(transparent)]
+    DieselError(#[from] diesel::result::Error),
+    #[error(transparent)]
+    PoolError(#[from] PoolError),
+    #[error(transparent)]
+    RenderError(#[from] RenderError),
+}
+
+impl From<PgError> for Error {
+    fn from(e: PgError) -> Self {
+        Error::Responder(e.into_response())
+    }
+}
+
+impl IntoResponse for PgError {
+    fn into_response(self) -> Response {
+        (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
+    }
+}
+
+pub async fn connect(
+    pool: Option<Pool<AsyncPgConnection>>,
+) -> Result<Object<AsyncPgConnection>, PgError> {
+    let obj = pool.ok_or(PgError::Missing)?.get().await?;
+    Ok(obj)
+}
+
+pub async fn get_worlds_by_limit(
+    pool: Option<Pool<AsyncPgConnection>>,
+    limit: i64,
+) -> Result<Vec<World>, PgError> {
+    let mut conn = connect(pool).await?;
+    let worlds = world::table
+        .limit(limit)
+        .get_results::<World>(&mut conn)
+        .await?;
+    Ok(worlds)
+}
+
+async fn _get_world(conn: &mut AsyncPgConnection, id: i32) -> Result<World, PgError> {
+    let world = world::table.find(id).first(conn).await?;
+    Ok(world)
+}
+
+pub async fn get_world(
+    pool: Option<Pool<AsyncPgConnection>>,
+    id: i32,
+) -> Result<World, PgError> {
+    let mut conn = connect(pool).await?;
+    _get_world(&mut conn, id).await
+}
+
+pub async fn get_worlds(
+    pool: Option<Pool<AsyncPgConnection>>,
+    mut rng: WyRand,
+    count: u16,
+) -> Result<Vec<World>, PgError> {
+    let mut conn = connect(pool).await?;
+
+    let mut worlds = Vec::<World>::with_capacity(count as usize);
+
+    for _ in 0..count {
+        let id = rng.generate_range(RANGE);
+        let w = _get_world(&mut conn, id).await?;
+        worlds.push(w);
+    }
+
+    Ok(worlds)
+}
+
+pub async fn update_worlds(
+    pool: Option<Pool<AsyncPgConnection>>,
+    mut rng: WyRand,
+    count: u16,
+) -> Result<Vec<World>, PgError> {
+    let mut conn = connect(pool).await?;
+
+    let mut worlds = Vec::<World>::with_capacity(count as usize);
+
+    for _ in 0..count {
+        let id = rng.generate_range(RANGE);
+        let rid = rng.generate_range(RANGE);
+        let mut w = _get_world(&mut conn, id).await?;
+        w.randomnumber = rid;
+        worlds.push(w);
+    }
+
+    worlds.sort_by_key(|w| w.id);
+
+    conn.build_transaction()
+        .run(move |conn| {
+            Box::pin(async move {
+                for w in &worlds {
+                    diesel::update(world::table)
+                        .filter(world::id.eq(w.id))
+                        .set(world::randomnumber.eq(w.randomnumber))
+                        .execute(conn)
+                        .await?;
+                }
+
+                Ok::<_, PgError>(worlds)
+            })
+        })
+        .await
+}
+
+pub async fn tell_fortune(
+    pool: Option<Pool<AsyncPgConnection>>,
+) -> Result<String, PgError> {
+    let mut conn = connect(pool).await?;
+
+    let mut items = fortune::table.load::<Fortune>(&mut conn).await?;
+
+    items.push(Fortune {
+        id: 0,
+        message: Cow::Borrowed("Additional fortune added at request time."),
+    });
+    items.sort_by(|it, next| it.message.cmp(&next.message));
+
+    let html = Fortunes::new(items).render_once()?;
+
+    Ok(html)
+}

+ 47 - 45
frameworks/Rust/viz/src/db_pg.rs

@@ -1,6 +1,6 @@
-use std::{borrow::Cow, fmt::Write, io};
+use std::{borrow::Cow, fmt::Write, io, sync::Arc};
 
 
-use futures_util::{stream::FuturesUnordered, TryFutureExt, TryStreamExt};
+use futures_util::{stream::FuturesUnordered, TryStreamExt};
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
 use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
 use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
 use viz::{Error, IntoResponse, Response, StatusCode};
 use viz::{Error, IntoResponse, Response, StatusCode};
@@ -49,7 +49,7 @@ impl PgConnection {
         // Spawn connection
         // Spawn connection
         tokio::spawn(async move {
         tokio::spawn(async move {
             if let Err(error) = conn.await {
             if let Err(error) = conn.await {
-                eprintln!("Connection error: {}", error);
+                eprintln!("Connection error: {error}");
             }
             }
         });
         });
 
 
@@ -97,14 +97,11 @@ impl PgConnection {
 
 
 impl PgConnection {
 impl PgConnection {
     async fn query_one_world(&self, id: i32) -> Result<World, PgError> {
     async fn query_one_world(&self, id: i32) -> Result<World, PgError> {
-        self.client
-            .query_one(&self.world, &[&id])
-            .await
-            .map(|row| World {
-                id: row.get(0),
-                randomnumber: row.get(1),
-            })
-            .map_err(PgError::Pg)
+        let row = self.client.query_one(&self.world, &[&id]).await?;
+        Ok(World {
+            id: row.get(0),
+            randomnumber: row.get(1),
+        })
     }
     }
 
 
     pub async fn get_world(&self) -> Result<World, PgError> {
     pub async fn get_world(&self) -> Result<World, PgError> {
@@ -114,49 +111,50 @@ impl PgConnection {
 
 
     pub async fn get_worlds(&self, num: u16) -> Result<Vec<World>, PgError> {
     pub async fn get_worlds(&self, num: u16) -> Result<Vec<World>, PgError> {
         let mut rng = self.rng.clone();
         let mut rng = self.rng.clone();
-        (0..num)
-            .map(|_| {
-                let id = rng.generate_range(RANGE);
-                self.query_one_world(id)
-            })
-            .collect::<FuturesUnordered<_>>()
-            .try_collect()
-            .await
+
+        let worlds = FuturesUnordered::new();
+
+        for _ in 0..num {
+            let id = rng.generate_range(RANGE);
+            worlds.push(self.query_one_world(id));
+        }
+
+        worlds.try_collect().await
     }
     }
 
 
     pub async fn get_worlds_by_limit(&self, limit: i64) -> Result<Vec<World>, PgError> {
     pub async fn get_worlds_by_limit(&self, limit: i64) -> Result<Vec<World>, PgError> {
-        self.client
+        Ok(self
+            .client
             .query("SELECT * FROM world LIMIT $1", &[&limit])
             .query("SELECT * FROM world LIMIT $1", &[&limit])
-            .await
-            .map(|rows| {
-                rows.iter()
-                    .map(|row| World {
-                        id: row.get(0),
-                        randomnumber: row.get(1),
-                    })
-                    .collect()
+            .await?
+            .iter()
+            .map(|row| World {
+                id: row.get(0),
+                randomnumber: row.get(1),
             })
             })
-            .map_err(PgError::Pg)
+            .collect())
     }
     }
 
 
     pub async fn update(&self, num: u16) -> Result<Vec<World>, PgError> {
     pub async fn update(&self, num: u16) -> Result<Vec<World>, PgError> {
         let mut rng = self.rng.clone();
         let mut rng = self.rng.clone();
 
 
-        let worlds: Vec<World> = (0..num)
-            .map(|_| {
-                let id = rng.generate_range(RANGE);
-                let rid = rng.generate_range(RANGE);
-                self.query_one_world(id).map_ok(move |mut world| {
-                    world.randomnumber = rid;
-                    world
-                })
-            })
-            .collect::<FuturesUnordered<_>>()
-            .try_collect()
-            .await?;
+        let worlds = FuturesUnordered::new();
+
+        for _ in 0..num {
+            let id = rng.generate_range(RANGE);
+            let rid = rng.generate_range(RANGE);
+
+            worlds.push(async move {
+                let mut world = self.query_one_world(id).await?;
+                world.randomnumber = rid;
+                Ok::<World, PgError>(world)
+            });
+        }
+
+        let worlds = worlds.try_collect::<Vec<World>>().await?;
 
 
         let num = num as usize;
         let num = num as usize;
-        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num * 3);
+        let mut params = Vec::<&(dyn ToSql + Sync)>::with_capacity(num * 3);
 
 
         for w in &worlds {
         for w in &worlds {
             params.push(&w.id);
             params.push(&w.id);
@@ -167,9 +165,9 @@ impl PgConnection {
             params.push(&w.id);
             params.push(&w.id);
         }
         }
 
 
-        let st = self.updates[num - 1].clone();
-
-        self.client.query(&st, &params[..]).await?;
+        self.client
+            .query(&self.updates[num - 1], &params[..])
+            .await?;
 
 
         Ok(worlds)
         Ok(worlds)
     }
     }
@@ -196,3 +194,7 @@ impl PgConnection {
         Ok(items)
         Ok(items)
     }
     }
 }
 }
+
+pub fn get_conn(pool: Option<Arc<PgConnection>>) -> Result<Arc<PgConnection>, PgError> {
+    pool.ok_or(PgError::Connect)
+}

+ 84 - 8
frameworks/Rust/viz/src/db_sqlx.rs

@@ -1,17 +1,21 @@
-use sqlx::Pool;
+use std::borrow::Cow;
+
+use nanorand::{Rng, WyRand};
+
+pub use sqlx::{
+    pool::PoolConnection,
+    postgres::{PgArguments, PgPool, PgPoolOptions, PgRow},
+    Arguments, Pool, Postgres, Row,
+};
 
 
 use viz::{
 use viz::{
     async_trait, Error, FromRequest, IntoResponse, Request, RequestExt, Response,
     async_trait, Error, FromRequest, IntoResponse, Request, RequestExt, Response,
     StatusCode,
     StatusCode,
 };
 };
 
 
+use crate::models_sqlx::*;
 use crate::utils::get_query_param;
 use crate::utils::get_query_param;
-
-pub use sqlx::{
-    pool::PoolConnection,
-    postgres::{PgArguments, PgPoolOptions},
-    Postgres,
-};
+use crate::RANGE;
 
 
 pub struct DatabaseConnection(pub PoolConnection<Postgres>);
 pub struct DatabaseConnection(pub PoolConnection<Postgres>);
 
 
@@ -20,7 +24,7 @@ impl FromRequest for DatabaseConnection {
     type Error = PgError;
     type Error = PgError;
 
 
     async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
     async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
-        req.state::<Pool<Postgres>>()
+        req.state::<PgPool>()
             .ok_or(PgError(sqlx::Error::Io(std::io::Error::from(
             .ok_or(PgError(sqlx::Error::Io(std::io::Error::from(
                 std::io::ErrorKind::NotConnected,
                 std::io::ErrorKind::NotConnected,
             ))))?
             ))))?
@@ -57,3 +61,75 @@ impl FromRequest for Counter {
         Ok(Counter(get_query_param(req.query_string())))
         Ok(Counter(get_query_param(req.query_string())))
     }
     }
 }
 }
+
+pub async fn get_worlds_by_limit(
+    mut conn: PoolConnection<Postgres>,
+    limit: i64,
+) -> Result<Vec<World>, PgError> {
+    let worlds = sqlx::query_as("SELECT * FROM World LIMIT $1")
+        .bind(limit)
+        .fetch_all(&mut conn)
+        .await?;
+    Ok(worlds)
+}
+
+pub async fn get_world(
+    conn: &mut PoolConnection<Postgres>,
+    id: i32,
+) -> Result<World, PgError> {
+    let world =
+        sqlx::query_as::<_, World>("SELECT id, randomnumber FROM World WHERE id = $1")
+            .bind(id)
+            .fetch_one(conn)
+            .await?;
+    Ok(world)
+}
+
+pub async fn update_worlds(
+    mut conn: PoolConnection<Postgres>,
+    mut rng: WyRand,
+    count: u16,
+) -> Result<Vec<World>, PgError> {
+    let mut worlds = Vec::<World>::with_capacity(count as usize);
+
+    for _ in 0..count {
+        let id = rng.generate_range(RANGE);
+        let rid = rng.generate_range(RANGE);
+        let mut w = get_world(&mut conn, id).await?;
+        w.randomnumber = rid;
+        worlds.push(w);
+    }
+
+    for w in &worlds {
+        let mut args = PgArguments::default();
+        args.add(w.randomnumber);
+        args.add(w.id);
+
+        sqlx::query_with("UPDATE World SET randomNumber = $1 WHERE id = $2", args)
+            .execute(&mut conn)
+            .await?;
+    }
+
+    Ok(worlds)
+}
+
+pub async fn get_fortunes(
+    mut conn: PoolConnection<Postgres>,
+) -> Result<Vec<Fortune>, PgError> {
+    let mut items = sqlx::query("SELECT * FROM Fortune")
+        .map(|row: PgRow| Fortune {
+            id: row.get(0),
+            message: Cow::Owned(row.get(1)),
+        })
+        .fetch_all(&mut conn)
+        .await?;
+
+    items.push(Fortune {
+        id: 0,
+        message: Cow::Borrowed("Additional fortune added at request time."),
+    });
+
+    items.sort_by(|it, next| it.message.cmp(&next.message));
+
+    Ok(items)
+}

+ 6 - 3
frameworks/Rust/viz/src/main.rs

@@ -1,6 +1,7 @@
 use serde::Serialize;
 use serde::Serialize;
 use viz::{
 use viz::{
-    header::SERVER, Error, Request, Response, ResponseExt, Result, Router, ServiceMaker,
+    header::{HeaderValue, SERVER},
+    Error, Request, Response, ResponseExt, Result, Router, ServiceMaker,
 };
 };
 
 
 mod server;
 mod server;
@@ -13,7 +14,8 @@ struct Message {
 
 
 async fn plaintext(_: Request) -> Result<Response> {
 async fn plaintext(_: Request) -> Result<Response> {
     let mut res = Response::text("Hello, World!");
     let mut res = Response::text("Hello, World!");
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
@@ -21,7 +23,8 @@ async fn json(_: Request) -> Result<Response> {
     let mut res = Response::json(Message {
     let mut res = Response::json(Message {
         message: "Hello, World!",
         message: "Hello, World!",
     })?;
     })?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 

+ 159 - 0
frameworks/Rust/viz/src/main_diesel.rs

@@ -0,0 +1,159 @@
+#[macro_use]
+extern crate diesel;
+
+use std::{
+    convert::identity,
+    thread::{available_parallelism, spawn},
+};
+
+use diesel_async::{
+    pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
+    AsyncPgConnection,
+};
+use nanorand::{Rng, WyRand};
+use once_cell::sync::OnceCell;
+use viz::{
+    header::{HeaderValue, SERVER},
+    types::State,
+    Request, RequestExt, Response, ResponseExt, Result, Router, ServiceMaker,
+};
+
+mod db_diesel;
+pub mod models_diesel;
+pub mod schema;
+mod server;
+mod utils;
+
+use db_diesel::*;
+use models_diesel::World;
+use utils::RANGE;
+
+const DB_URL: &str =
+    "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+static CACHED: OnceCell<Vec<World>> = OnceCell::new();
+
+async fn db(req: Request) -> Result<Response> {
+    let mut rng = req.state::<WyRand>().unwrap();
+    let pool = req.state::<Pool<AsyncPgConnection>>();
+
+    let random_id = rng.generate_range(RANGE);
+
+    let world = get_world(pool, random_id).await?;
+
+    let mut res = Response::json(world)?;
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
+    Ok(res)
+}
+
+async fn fortunes(req: Request) -> Result<Response> {
+    let pool = req.state::<Pool<AsyncPgConnection>>();
+
+    let fortunes = tell_fortune(pool).await?;
+
+    let mut res = Response::html(fortunes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
+    Ok(res)
+}
+
+async fn queries(req: Request) -> Result<Response> {
+    let rng = req.state::<WyRand>().unwrap();
+    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let count = utils::get_query_param(req.query_string());
+
+    let worlds = get_worlds(pool, rng, count).await?;
+
+    let mut res = Response::json(worlds)?;
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
+    Ok(res)
+}
+
+async fn cached_queries(req: Request) -> Result<Response> {
+    let count = utils::get_query_param(req.query_string());
+    let mut rng = WyRand::new();
+
+    let worlds = (0..count)
+        .map(|_| {
+            let id = rng.generate_range(RANGE) as usize;
+            CACHED.get()?.get(id)
+        })
+        .filter_map(identity)
+        .collect::<Vec<_>>();
+
+    let mut res = Response::json(worlds)?;
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
+    Ok(res)
+}
+
+async fn updates(req: Request) -> Result<Response> {
+    let rng = req.state::<WyRand>().unwrap();
+    let pool = req.state::<Pool<AsyncPgConnection>>();
+    let count = utils::get_query_param(req.query_string());
+
+    let worlds = update_worlds(pool, rng, count).await?;
+
+    let mut res = Response::json(worlds)?;
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
+    Ok(res)
+}
+
+async fn populate_cache(pool: Option<Pool<AsyncPgConnection>>) -> Result<()> {
+    let worlds = get_worlds_by_limit(pool, 10_000).await?;
+    CACHED.set(worlds).unwrap();
+    Ok(())
+}
+
+fn main() {
+    let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as usize;
+
+    let pool =
+        Pool::<AsyncPgConnection>::builder(AsyncDieselConnectionManager::new(DB_URL))
+            .max_size(max)
+            .wait_timeout(None)
+            .create_timeout(None)
+            .recycle_timeout(None)
+            .build()
+            .unwrap();
+
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+
+    rt.block_on(populate_cache(Some(pool.clone())))
+        .expect("cache insert failed");
+
+    let rng = WyRand::new();
+
+    let service = ServiceMaker::from(
+        Router::new()
+            .get("/db", db)
+            .get("/fortunes", fortunes)
+            .get("/queries", queries)
+            .get("/updates", updates)
+            .with(State::new(pool))
+            .get("/cached_queries", cached_queries)
+            .with(State::new(rng)),
+    );
+
+    for _ in 1..max {
+        let service = service.clone();
+        spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve(service));
+        });
+    }
+
+    rt.block_on(serve(service));
+}
+
+async fn serve(service: ServiceMaker) {
+    server::builder().serve(service).await.unwrap()
+}

+ 27 - 23
frameworks/Rust/viz/src/main_pg.rs

@@ -7,8 +7,9 @@ use std::{
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
 use once_cell::sync::OnceCell;
 use once_cell::sync::OnceCell;
 use viz::{
 use viz::{
-    header::SERVER, types::State, Error, Request, RequestExt, Response, ResponseExt,
-    Result, Router, ServiceMaker,
+    header::{HeaderValue, SERVER},
+    types::State,
+    Request, RequestExt, Response, ResponseExt, Result, Router, ServiceMaker,
 };
 };
 use yarte::ywrite_html;
 use yarte::ywrite_html;
 
 
@@ -17,44 +18,47 @@ mod models;
 mod server;
 mod server;
 mod utils;
 mod utils;
 
 
-use db_pg::{PgConnection, PgError};
-use utils::{HDR_SERVER, RANGE};
+use db_pg::{get_conn, PgConnection};
+use utils::RANGE;
 
 
 const DB_URL: &str =
 const DB_URL: &str =
     "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
     "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 static CACHED: OnceCell<Vec<models::World>> = OnceCell::new();
 static CACHED: OnceCell<Vec<models::World>> = OnceCell::new();
 
 
 async fn db(req: Request) -> Result<Response> {
 async fn db(req: Request) -> Result<Response> {
-    let db = req.state::<Arc<PgConnection>>().ok_or(PgError::Connect)?;
+    let conn = get_conn(req.state::<Arc<PgConnection>>())?;
 
 
-    let world = db.get_world().await?;
+    let world = conn.get_world().await?;
 
 
     let mut res = Response::json(world)?;
     let mut res = Response::json(world)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
 async fn fortunes(req: Request) -> Result<Response> {
 async fn fortunes(req: Request) -> Result<Response> {
-    let db = req.state::<Arc<PgConnection>>().ok_or(PgError::Connect)?;
+    let conn = get_conn(req.state::<Arc<PgConnection>>())?;
 
 
-    let fortunes = db.tell_fortune().await?;
+    let fortunes = conn.tell_fortune().await?;
 
 
     let mut buf = String::with_capacity(2048);
     let mut buf = String::with_capacity(2048);
     ywrite_html!(buf, "{{> fortune }}");
     ywrite_html!(buf, "{{> fortune }}");
 
 
     let mut res = Response::html(buf);
     let mut res = Response::html(buf);
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
 async fn queries(req: Request) -> Result<Response> {
 async fn queries(req: Request) -> Result<Response> {
-    let db = req.state::<Arc<PgConnection>>().ok_or(PgError::Connect)?;
-
     let count = utils::get_query_param(req.query_string());
     let count = utils::get_query_param(req.query_string());
-    let worlds = db.get_worlds(count).await?;
+    let conn = get_conn(req.state::<Arc<PgConnection>>())?;
+
+    let worlds = conn.get_worlds(count).await?;
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
@@ -71,22 +75,24 @@ async fn cached_queries(req: Request) -> Result<Response> {
         .collect::<Vec<_>>();
         .collect::<Vec<_>>();
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
 async fn updates(req: Request) -> Result<Response> {
 async fn updates(req: Request) -> Result<Response> {
-    let db = req.state::<Arc<PgConnection>>().ok_or(PgError::Connect)?;
-
     let count = utils::get_query_param(req.query_string());
     let count = utils::get_query_param(req.query_string());
-    let worlds = db.update(count).await?;
+    let conn = get_conn(req.state::<Arc<PgConnection>>())?;
+
+    let worlds = conn.update(count).await?;
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
-async fn populate_cache() -> Result<(), Error> {
+async fn populate_cache() -> Result<()> {
     let conn = PgConnection::connect(DB_URL).await;
     let conn = PgConnection::connect(DB_URL).await;
     let worlds = conn.get_worlds_by_limit(10_000).await?;
     let worlds = conn.get_worlds_by_limit(10_000).await?;
     CACHED.set(worlds).unwrap();
     CACHED.set(worlds).unwrap();
@@ -99,9 +105,7 @@ fn main() {
         .build()
         .build()
         .unwrap();
         .unwrap();
 
 
-    rt.block_on(async {
-        populate_cache().await.expect("cache insert failed");
-    });
+    rt.block_on(populate_cache()).expect("cache insert failed");
 
 
     for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
     for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
         spawn(move || {
         spawn(move || {

+ 36 - 85
frameworks/Rust/viz/src/main_sqlx.rs

@@ -1,11 +1,13 @@
-use std::{borrow::Cow, convert::identity};
+use std::{convert::identity, thread::available_parallelism};
 
 
 use nanorand::{Rng, WyRand};
 use nanorand::{Rng, WyRand};
 use once_cell::sync::OnceCell;
 use once_cell::sync::OnceCell;
-use sqlx::{postgres::PgRow, Arguments, Pool, Row};
+use sqlx::Pool;
 use viz::{
 use viz::{
-    header::SERVER, types::State, BytesMut, Error, Request, RequestExt, Response,
-    ResponseExt, Result, Router, ServiceMaker,
+    header::{HeaderValue, SERVER},
+    types::State,
+    BytesMut, Error, Request, RequestExt, Response, ResponseExt, Result, Router,
+    ServiceMaker,
 };
 };
 
 
 mod db_sqlx;
 mod db_sqlx;
@@ -13,11 +15,9 @@ mod models_sqlx;
 mod server;
 mod server;
 mod utils;
 mod utils;
 
 
-use db_sqlx::{
-    Counter, DatabaseConnection, PgArguments, PgError, PgPoolOptions, Postgres,
-};
+use db_sqlx::*;
 use models_sqlx::{Fortune, World};
 use models_sqlx::{Fortune, World};
-use utils::{HDR_SERVER, RANGE};
+use utils::RANGE;
 
 
 const DB_URL: &str =
 const DB_URL: &str =
     "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
     "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
@@ -29,42 +29,25 @@ async fn db(mut req: Request) -> Result<Response> {
 
 
     let random_id = rng.generate_range(RANGE);
     let random_id = rng.generate_range(RANGE);
 
 
-    let world: World =
-        sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
-            .bind(random_id)
-            .fetch_one(&mut conn)
-            .await
-            .map_err(PgError)?;
+    let world = get_world(&mut conn, random_id).await?;
 
 
     let mut res = Response::json(world)?;
     let mut res = Response::json(world)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
 async fn fortunes(mut req: Request) -> Result<Response> {
 async fn fortunes(mut req: Request) -> Result<Response> {
-    let DatabaseConnection(mut conn) = req.extract::<DatabaseConnection>().await?;
-
-    let mut items: Vec<Fortune> = sqlx::query("SELECT * FROM Fortune")
-        .map(|row: PgRow| Fortune {
-            id: row.get(0),
-            message: Cow::Owned(row.get(1)),
-        })
-        .fetch_all(&mut conn)
-        .await
-        .map_err(PgError)?;
-
-    items.push(Fortune {
-        id: 0,
-        message: Cow::Borrowed("Additional fortune added at request time."),
-    });
+    let DatabaseConnection(conn) = req.extract::<DatabaseConnection>().await?;
 
 
-    items.sort_by(|it, next| it.message.cmp(&next.message));
+    let items = get_fortunes(conn).await?;
 
 
     let mut buf = BytesMut::with_capacity(2048);
     let mut buf = BytesMut::with_capacity(2048);
     buf.extend(FortunesTemplate { items }.to_string().as_bytes());
     buf.extend(FortunesTemplate { items }.to_string().as_bytes());
 
 
     let mut res = Response::html(buf.freeze());
     let mut res = Response::html(buf.freeze());
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
@@ -73,23 +56,17 @@ async fn queries(mut req: Request) -> Result<Response> {
         .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
         .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
         .await?;
         .await?;
 
 
-    let mut worlds = Vec::with_capacity(count as usize);
+    let mut worlds = Vec::<World>::with_capacity(count as usize);
 
 
     for _ in 0..count {
     for _ in 0..count {
         let id = rng.generate_range(RANGE);
         let id = rng.generate_range(RANGE);
-
-        let world: World =
-            sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
-                .bind(id)
-                .fetch_one(&mut conn)
-                .await
-                .map_err(PgError)?;
-
-        worlds.push(world);
+        let w = get_world(&mut conn, id).await?;
+        worlds.push(w);
     }
     }
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
@@ -106,65 +83,39 @@ async fn cached_queries(mut req: Request) -> Result<Response> {
         .collect::<Vec<_>>();
         .collect::<Vec<_>>();
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
 async fn updates(mut req: Request) -> Result<Response> {
 async fn updates(mut req: Request) -> Result<Response> {
-    let (Counter(count), State(mut rng), DatabaseConnection(mut conn)) = req
+    let (Counter(count), State(rng), DatabaseConnection(conn)) = req
         .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
         .extract::<(Counter, State<WyRand>, DatabaseConnection)>()
         .await?;
         .await?;
 
 
-    let mut worlds = Vec::with_capacity(count as usize);
-
-    for _ in 0..count {
-        let id = rng.generate_range(RANGE);
-
-        let world: World =
-            sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1")
-                .bind(id)
-                .fetch_one(&mut conn)
-                .await
-                .map_err(PgError)?;
-
-        worlds.push(world);
-    }
-
-    for w in &mut worlds {
-        let randomnumber = rng.generate_range(RANGE);
-        let mut args = PgArguments::default();
-        args.add(randomnumber);
-        args.add(w.id);
-        w.randomnumber = randomnumber;
-
-        sqlx::query_with("UPDATE World SET randomNumber = $1 WHERE id = $2", args)
-            .execute(&mut conn)
-            .await
-            .map_err(PgError)?;
-    }
+    let worlds = update_worlds(conn, rng, count).await?;
 
 
     let mut res = Response::json(worlds)?;
     let mut res = Response::json(worlds)?;
-    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Viz"));
     Ok(res)
     Ok(res)
 }
 }
 
 
-async fn populate_cache(pool: Pool<Postgres>) -> Result<(), Error> {
-    let mut conn = pool.acquire().await.map_err(Error::normal)?;
-
-    let worlds: Vec<World> = sqlx::query_as("SELECT * FROM World LIMIT $1")
-        .bind(10_000)
-        .fetch_all(&mut conn)
-        .await
-        .map_err(PgError)?;
-    CACHED.set(worlds).unwrap();
-    Ok(())
+async fn populate_cache(pool: Pool<Postgres>) -> Result<()> {
+    let conn = pool.acquire().await.map_err(Error::normal)?;
+    let worlds = get_worlds_by_limit(conn, 10_000).await?;
+    CACHED
+        .set(worlds)
+        .map_err(|_| PgError::from(sqlx::Error::RowNotFound).into())
 }
 }
 
 
 #[tokio::main]
 #[tokio::main]
 async fn main() -> Result<()> {
 async fn main() -> Result<()> {
+    let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as u32;
+
     let pool = PgPoolOptions::new()
     let pool = PgPoolOptions::new()
-        .max_connections(56)
-        .min_connections(56)
+        .max_connections(max)
+        .min_connections(max)
         .connect(DB_URL)
         .connect(DB_URL)
         .await
         .await
         .map_err(PgError)?;
         .map_err(PgError)?;

+ 1 - 1
frameworks/Rust/viz/src/models.rs

@@ -2,7 +2,7 @@ use std::borrow::Cow;
 
 
 use serde::Serialize;
 use serde::Serialize;
 
 
-#[derive(Clone, Copy, Serialize, Debug, yarte::Serialize)]
+#[derive(Serialize, Debug, yarte::Serialize)]
 pub struct World {
 pub struct World {
     pub id: i32,
     pub id: i32,
     pub randomnumber: i32,
     pub randomnumber: i32,

+ 30 - 0
frameworks/Rust/viz/src/models_diesel.rs

@@ -0,0 +1,30 @@
+use std::borrow::Cow;
+
+use diesel::Queryable;
+use sailfish::TemplateOnce;
+use serde::Serialize;
+
+#[derive(Serialize, Queryable, Debug)]
+pub struct World {
+    pub id: i32,
+    pub randomnumber: i32,
+}
+
+#[derive(Serialize, Queryable, Debug)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: Cow<'static, str>,
+}
+
+#[derive(TemplateOnce)]
+#[template(path = "fortune.stpl", rm_whitespace = true)]
+pub struct Fortunes {
+    items: Vec<Fortune>,
+}
+
+impl Fortunes {
+    #[inline]
+    pub fn new(items: Vec<Fortune>) -> Self {
+        Self { items }
+    }
+}

+ 2 - 2
frameworks/Rust/viz/src/models_sqlx.rs

@@ -3,13 +3,13 @@ use std::borrow::Cow;
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
 use sqlx::FromRow;
 use sqlx::FromRow;
 
 
-#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+#[derive(Debug, PartialEq, Deserialize, Serialize, FromRow)]
 pub struct World {
 pub struct World {
     pub id: i32,
     pub id: i32,
     pub randomnumber: i32,
     pub randomnumber: i32,
 }
 }
 
 
-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+#[derive(Debug, PartialEq, Deserialize, Serialize, FromRow)]
 pub struct Fortune {
 pub struct Fortune {
     pub id: i32,
     pub id: i32,
     pub message: Cow<'static, str>,
     pub message: Cow<'static, str>,

+ 13 - 0
frameworks/Rust/viz/src/schema.rs

@@ -0,0 +1,13 @@
+table! {
+    world (id) {
+        id -> Integer,
+        randomnumber -> Integer,
+    }
+}
+
+table! {
+    fortune (id) {
+        id -> Integer,
+        message -> Text,
+    }
+}

+ 2 - 4
frameworks/Rust/viz/src/server.rs

@@ -12,10 +12,8 @@ pub fn builder() -> hyper::server::Builder<AddrIncoming> {
     println!("Started viz server at 8080");
     println!("Started viz server at 8080");
 
 
     viz::Server::builder(incoming)
     viz::Server::builder(incoming)
-        .tcp_keepalive(None)
-        .tcp_nodelay(true)
         .http1_only(true)
         .http1_only(true)
-        .http1_pipeline_flush(true)
+        .tcp_nodelay(true)
 }
 }
 
 
 fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
 fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
@@ -27,7 +25,7 @@ fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
     #[cfg(unix)]
     #[cfg(unix)]
     {
     {
         if let Err(e) = socket.set_reuseport(true) {
         if let Err(e) = socket.set_reuseport(true) {
-            eprintln!("error setting SO_REUSEPORT: {}", e);
+            eprintln!("error setting SO_REUSEPORT: {e}");
         }
         }
     }
     }
 
 

+ 8 - 11
frameworks/Rust/viz/src/utils.rs

@@ -1,20 +1,17 @@
-use std::{ops::Range};
+use std::ops::Range;
 
 
 use atoi::FromRadix10;
 use atoi::FromRadix10;
-use viz::header::HeaderValue;
 
 
 #[allow(dead_code)]
 #[allow(dead_code)]
 pub const RANGE: Range<i32> = 1..10_001;
 pub const RANGE: Range<i32> = 1..10_001;
 
 
-pub const HDR_SERVER: HeaderValue = HeaderValue::from_static("VIZ");
-
 #[allow(dead_code)]
 #[allow(dead_code)]
 pub fn get_query_param(query: Option<&str>) -> u16 {
 pub fn get_query_param(query: Option<&str>) -> u16 {
-    let query = query.unwrap_or("");
-    let q = if let Some(pos) = query.find('q') {
-        u16::from_radix_10(query.split_at(pos + 2).1.as_ref()).0
-    } else {
-        1
-    };
-    q.clamp(1, 500)
+    query
+        .and_then(|s| {
+            s.find('q')
+                .map(|p| u16::from_radix_10(s.split_at(p + 2).1.as_ref()).0)
+        })
+        .map(|n| n.clamp(1, 500))
+        .unwrap_or(1)
 }
 }

+ 10 - 0
frameworks/Rust/viz/templates/fortune.stpl

@@ -0,0 +1,10 @@
+<!DOCTYPE html>
+<html>
+  <head><title>Fortunes</title></head>
+  <body>
+    <table>
+      <tr><th>id</th><th>message</th></tr>
+      <% for item in items { %><tr><td><%= item.id %></td><td><%= &*item.message %></td></tr><% } %>
+    </table>
+  </body>
+</html>

+ 13 - 0
frameworks/Rust/viz/viz-diesel.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:1.66.0
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /viz
+WORKDIR /viz
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin viz-diesel --features="diesel,diesel-async,sailfish"
+
+EXPOSE 8080
+
+CMD ./target/release/viz-diesel

+ 1 - 1
frameworks/Rust/viz/viz-pg.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.65.0
+FROM rust:1.66.0
 
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 
 

+ 1 - 1
frameworks/Rust/viz/viz-sqlx.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.65.0
+FROM rust:1.66.0
 
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 
 

+ 1 - 1
frameworks/Rust/viz/viz.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.65.0
+FROM rust:1.66.0
 
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 RUN apt-get update -yqq && apt-get install -yqq cmake g++