Browse Source

Salvo 0.38 and use moka for cache (#8110)

* salvo-0.38

* salvo 0.38

* Use moka library for cache

* change server header #8060

* Update config file

* Use fixed rust version and remove diesel cache test

* remove rev = "f14f819"

* Use static HeaderValue

* Fix salvo-pg error
Chrislearn Young 2 năm trước cách đây
mục cha
commit
1d107de3ff

+ 14 - 9
frameworks/Rust/salvo/Cargo.toml

@@ -15,22 +15,26 @@ path = "src/main_diesel.rs"
 name = "main-pg"
 path = "src/main_pg.rs"
 
+[[bin]]
+name = "main-moka"
+path = "src/main_moka.rs"
+
 [dependencies]
 anyhow = "1"
 async-trait = "0.1"
 bytes = "1"
-diesel = { version = "1.4", features = ["postgres", "r2d2"] }
-futures = "0.3"
+diesel = { version = "2", features = ["postgres", "r2d2"] }
+futures-util = "0.3"
+moka = "0.10"
 markup = "0.13"
-# mimalloc = { version = "0.1", default-features = false }
+mimalloc = { version = "0.1", default-features = false }
 once_cell = "1"
 rand = { version = "0.8", features = ["min_const_gen", "small_rng"] }
-random-fast-rng = "0.1"
-salvo = { version = "0.34", default-features = false, features = ["anyhow"] }
+salvo = { version = "0.38", default-features = false, features = ["anyhow", "http1"] }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
-smallvec = "1"
-snmalloc-rs = { version = "0.3", features = ["native-cpu"] }
+# smallvec = "1"
+# snmalloc-rs = { version = "0.3", features = ["native-cpu"] }
 tokio = { version = "1", features = ["macros", "rt"] }
 tokio-postgres = "0.7"
 v_htmlescape = "0.15"
@@ -40,5 +44,6 @@ lto = true
 opt-level = 3
 codegen-units = 1
 panic = "abort"
-debug = false
-incremental = false
+
+[patch.crates-io]
+salvo = { git = "https://github.com/salvo-rs/salvo.git" }

+ 18 - 3
frameworks/Rust/salvo/benchmark_config.json

@@ -26,7 +26,6 @@
         "fortune_url": "/fortunes",
         "query_url": "/queries?q=",
         "update_url": "/updates?q=",
-        "cached_query_url": "/cached_queries?q=",
         "port": 8080,
         "approach": "Realistic",
         "classification": "Micro",
@@ -38,7 +37,7 @@
         "webserver": "Hyper",
         "os": "Linux",
         "database_os": "Linux",
-        "display_name": "Salvo [Diesel]",
+        "display_name": "Salvo [diesel]",
         "notes": "",
         "versus": ""
       },
@@ -47,6 +46,22 @@
         "fortune_url": "/fortunes",
         "query_url": "/queries?q=",
         "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "salvo",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Salvo [pg]",
+        "notes": "",
+        "versus": ""
+      },
+      "moka": {
         "cached_query_url": "/cached_queries?q=",
         "port": 8080,
         "approach": "Realistic",
@@ -59,7 +74,7 @@
         "webserver": "Hyper",
         "os": "Linux",
         "database_os": "Linux",
-        "display_name": "Salvo [PG]",
+        "display_name": "Salvo [moka]",
         "notes": "",
         "versus": ""
       }

+ 3 - 5
frameworks/Rust/salvo/salvo-diesel.dockerfile

@@ -1,12 +1,10 @@
-FROM rust:1.62.1
-
-RUN apt-get update -yqq && apt-get install -yqq cmake g++
+FROM rust:1.68.2
 
 ADD ./ /salvo
 WORKDIR /salvo
 
-RUN cargo clean
-RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
 
 EXPOSE 8080
 

+ 11 - 0
frameworks/Rust/salvo/salvo-moka.dockerfile

@@ -0,0 +1,11 @@
+FROM rust:1.68.2
+
+ADD ./ /salvo
+WORKDIR /salvo
+
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
+
+EXPOSE 8080
+
+CMD ./target/release/main-moka

+ 3 - 5
frameworks/Rust/salvo/salvo-pg.dockerfile

@@ -1,12 +1,10 @@
-FROM rust:1.62.1
-
-RUN apt-get update -yqq && apt-get install -yqq cmake g++
+FROM rust:1.68.2
 
 ADD ./ /salvo
 WORKDIR /salvo
 
-RUN cargo clean
-RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
 
 EXPOSE 8080
 

+ 3 - 8
frameworks/Rust/salvo/salvo.dockerfile

@@ -1,15 +1,10 @@
-FROM rust:1.62.1
-
-# Disable simd at jsonescape
-ENV CARGO_CFG_JSONESCAPE_DISABLE_AUTO_SIMD=
-
-RUN apt-get update -yqq && apt-get install -yqq cmake g++
+FROM rust:1.68.2
 
 ADD ./ /salvo
 WORKDIR /salvo
 
-RUN cargo clean
-RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
 
 EXPOSE 8080
 

+ 38 - 19
frameworks/Rust/salvo/src/main.rs

@@ -1,15 +1,21 @@
-// #[global_allocator]
-// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+#[global_allocator]
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::sync::Arc;
+use std::thread::available_parallelism;
 
 use bytes::Bytes;
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::body::ResBody;
 use salvo::http::header::{self, HeaderValue};
-use salvo::http::response::Body;
 use salvo::prelude::*;
 use serde::Serialize;
 
-mod server;
+mod utils;
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static PLAIN_HEADER: HeaderValue = HeaderValue::from_static("text/plain");
 
 #[derive(Serialize)]
 pub struct Message {
@@ -19,37 +25,50 @@ pub struct Message {
 #[handler]
 fn json(res: &mut Response) {
     let headers = res.headers_mut();
-    headers.insert(header::SERVER, HeaderValue::from_static("S"));
-    headers.insert(
-        header::CONTENT_TYPE,
-        HeaderValue::from_static("application/json"),
-    );
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
     let data = serde_json::to_vec(&Message {
         message: "Hello, World!",
     })
     .unwrap();
-    res.set_body(Body::Once(Bytes::from(data)));
+    res.set_body(ResBody::Once(Bytes::from(data)));
 }
 
 #[handler]
 fn plaintext(res: &mut Response) {
     let headers = res.headers_mut();
-    headers.insert(header::SERVER, HeaderValue::from_static("S"));
-    headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
-    res.set_body(Body::Once(Bytes::from_static(b"Hello, world!")));
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, PLAIN_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from_static(b"Hello, world!")));
 }
 
-#[tokio::main]
-async fn main() {
+fn main() {
+    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
     let router = Arc::new(
         Router::new()
             .push(Router::with_path("plaintext").get(plaintext))
             .push(Router::with_path("json").get(json)),
     );
 
-    server::builder()
-        .http1_pipeline_flush(true)
-        .serve(Service::new(router))
-        .await
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
         .unwrap();
+    for _ in 1..size {
+        let router = router.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve(router));
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve(router));
+}
+
+async fn serve(router: Arc<Router>) {
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
 }

+ 61 - 65
frameworks/Rust/salvo/src/main_diesel.rs

@@ -1,11 +1,12 @@
-#[global_allocator]
-static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 // #[global_allocator]
-// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+#[global_allocator]
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[macro_use]
 extern crate diesel;
 
+use bytes::Bytes;
 use std::cmp;
 use std::fmt::Write;
 use std::sync::Arc;
@@ -17,12 +18,14 @@ use diesel::r2d2::{ConnectionManager, Pool, PoolError, PooledConnection};
 use once_cell::sync::OnceCell;
 use rand::rngs::SmallRng;
 use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
 use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
 use salvo::prelude::*;
 
 mod models;
 mod schema;
-mod server;
+mod utils;
 use models::*;
 use schema::*;
 
@@ -30,7 +33,9 @@ const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/he
 type PgPool = Pool<ConnectionManager<PgConnection>>;
 
 static DB_POOL: OnceCell<PgPool> = OnceCell::new();
-static CACHED_WORLDS: OnceCell<Vec<World>> = OnceCell::new();
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
 
 fn connect() -> Result<PooledConnection<ConnectionManager<PgConnection>>, PoolError> {
     unsafe { DB_POOL.get_unchecked().get() }
@@ -50,92 +55,87 @@ fn build_pool(database_url: &str, size: u32) -> Result<PgPool, PoolError> {
 async fn world_row(res: &mut Response) -> Result<(), Error> {
     let mut rng = SmallRng::from_entropy();
     let random_id = rng.gen_range(1..10_001);
-    let conn = connect()?;
-    let row = world::table.find(random_id).first::<World>(&conn)?;
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Json(row));
+    let mut conn = connect()?;
+    let world = world::table.find(random_id).first::<World>(&mut conn)?;
+
+    let data = serde_json::to_vec(&world).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
     Ok(())
 }
 
 #[handler]
 async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.query::<usize>("q").unwrap_or(1);
+    let count = req.query::<u16>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
-    let mut worlds = Vec::with_capacity(count);
+    let mut worlds = Vec::with_capacity(count as usize);
     let mut rng = SmallRng::from_entropy();
-    let conn = connect()?;
+    let mut conn = connect()?;
     for _ in 0..count {
         let id = rng.gen_range(1..10_001);
-        let w = world::table.find(id).get_result::<World>(&conn)?;
+        let w = world::table.find(id).get_result::<World>(&mut conn)?;
         worlds.push(w);
     }
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Json(worlds));
-    Ok(())
-}
 
-#[handler]
-async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.query::<usize>("q").unwrap_or(1);
-    let count = cmp::min(500, cmp::max(1, count));
-    let mut worlds = Vec::with_capacity(count);
-    let mut rng = SmallRng::from_entropy();
-    for _ in 0..count {
-        let idx = rng.gen_range(0..10_000);
-        unsafe {
-            let w = CACHED_WORLDS.get_unchecked().get(idx).unwrap();
-            worlds.push(w);
-        }
-    }
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Json(worlds));
+    let data = serde_json::to_vec(&worlds).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
     Ok(())
 }
 
 #[handler]
 async fn updates(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.query::<usize>("q").unwrap_or(1);
+    let count = req.query::<u16>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
-    let conn = connect()?;
-    let mut worlds = Vec::with_capacity(count);
+    let mut conn = connect()?;
+    let mut worlds = Vec::with_capacity(count as usize);
     let mut rng = SmallRng::from_entropy();
     for _ in 0..count {
         let w_id: i32 = rng.gen_range(1..10_001);
-        let mut w = world::table.find(w_id).first::<World>(&conn)?;
+        let mut w = world::table.find(w_id).first::<World>(&mut conn)?;
         w.randomnumber = rng.gen_range(1..10_001);
         worlds.push(w);
     }
     worlds.sort_by_key(|w| w.id);
-    conn.transaction::<(), Error, _>(|| {
+    conn.transaction::<(), Error, _>(|conn| {
         for w in &worlds {
             diesel::update(world::table)
                 .filter(world::id.eq(w.id))
                 .set(world::randomnumber.eq(w.randomnumber))
-                .execute(&conn)?;
+                .execute(conn)?;
         }
         Ok(())
     })?;
 
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Json(worlds));
+    let data = serde_json::to_vec(&worlds).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
     Ok(())
 }
 
 #[handler]
 async fn fortunes(res: &mut Response) -> Result<(), Error> {
-    let conn = connect()?;
-    let mut items = fortune::table.get_results::<Fortune>(&conn)?;
+    let mut conn = connect()?;
+    let mut items = fortune::table.get_results::<Fortune>(&mut conn)?;
     items.push(Fortune {
         id: 0,
         message: "Additional fortune added at request time.".to_string(),
     });
 
     items.sort_by(|it, next| it.message.cmp(&next.message));
-    let mut body = String::new();
-    write!(&mut body, "{}", FortunesTemplate { items }).unwrap();
+    let mut data = String::new();
+    write!(&mut data, "{}", FortunesTemplate { items }).unwrap();
 
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Text::Html(body));
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
     Ok(())
 }
 
@@ -161,27 +161,26 @@ markup::define! {
     }
 }
 
-fn populate_cache() -> Result<(), Error> {
-    let conn = connect()?;
-    let worlds = world::table.limit(10_000).get_results::<World>(&conn)?;
-    CACHED_WORLDS.set(worlds).unwrap();
-    Ok(())
-}
-
 fn main() {
+    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    DB_POOL
+        .set(
+            build_pool(DB_URL, size as u32)
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+        )
+        .ok();
+
     let router = Arc::new(
         Router::new()
             .push(Router::with_path("db").get(world_row))
             .push(Router::with_path("fortunes").get(fortunes))
             .push(Router::with_path("queries").get(queries))
-            .push(Router::with_path("cached_queries").get(cached_queries))
             .push(Router::with_path("updates").get(updates)),
     );
-    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
-    DB_POOL
-        .set(build_pool(DB_URL, size as u32).unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)))
-        .ok();
-    populate_cache().expect("error cache worlds");
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
     for _ in 1..size {
         let router = router.clone();
         std::thread::spawn(move || {
@@ -192,14 +191,11 @@ fn main() {
             rt.block_on(serve(router));
         });
     }
-    println!("Starting http server: 127.0.0.1:8080");
-    let rt = tokio::runtime::Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap();
+    println!("Started http server: 127.0.0.1:8080");
     rt.block_on(serve(router));
 }
 
 async fn serve(router: Arc<Router>) {
-    server::builder().serve(Service::new(router)).await.unwrap();
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
 }

+ 97 - 0
frameworks/Rust/salvo/src/main_moka.rs

@@ -0,0 +1,97 @@
+// #[global_allocator]
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+#[global_allocator]
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use std::cmp;
+use std::sync::Arc;
+use std::thread::available_parallelism;
+
+use anyhow::Error;
+use bytes::Bytes;
+use moka::sync::Cache as MokaCache;
+use once_cell::sync::OnceCell;
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
+use salvo::prelude::*;
+
+mod models;
+mod utils;
+use models::*;
+mod pg_conn;
+use pg_conn::PgConnection;
+
+const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+static CACHED_WORLDS: OnceCell<MokaCache<usize, World>> = OnceCell::new();
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+
+#[handler]
+fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+    let mut worlds = Vec::with_capacity(count as usize);
+    let mut rng = SmallRng::from_entropy();
+    for _ in 0..count {
+        let idx = rng.gen_range(0..10_000);
+        unsafe {
+            let w = CACHED_WORLDS.get_unchecked().get(&idx).unwrap();
+            worlds.push(w);
+        }
+    }
+    let data = serde_json::to_vec(&worlds).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+async fn populate_cache() -> Result<(), Error> {
+    let conn = PgConnection::create(DB_URL).await?;
+    let worlds = conn.get_worlds(10_000).await?;
+    let cache = MokaCache::new(10_000);
+    for (i, word) in worlds.into_iter().enumerate() {
+        cache.insert(i, word);
+    }
+    CACHED_WORLDS.set(cache).unwrap();
+    Ok(())
+}
+
+fn main() {
+    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+    rt.block_on(async {
+        populate_cache().await.expect("error cache worlds");
+    });
+
+    let router = Arc::new(Router::with_path("cached_queries").get(cached_queries));
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+    for _ in 1..size{
+        let router = router.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve(router));
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve(router));
+}
+
+async fn serve(router: Arc<Router>) {
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
+}

+ 72 - 210
frameworks/Rust/salvo/src/main_pg.rs

@@ -1,183 +1,32 @@
-#[global_allocator]
-static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 // #[global_allocator]
-// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+#[global_allocator]
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::cmp;
-use std::collections::HashMap;
 use std::fmt::Write;
-use std::io;
+use std::sync::Arc;
 use std::thread::available_parallelism;
 
-use anyhow::Error;
 use async_trait::async_trait;
-use futures::stream::futures_unordered::FuturesUnordered;
-use futures::TryStreamExt;
-use once_cell::sync::OnceCell;
-use rand::distributions::{Distribution, Uniform};
-use rand::rngs::SmallRng;
-use rand::{Rng, SeedableRng};
+use bytes::Bytes;
+use salvo::conn::tcp::TcpAcceptor;
 use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
 use salvo::prelude::*;
 use salvo::routing::FlowCtrl;
-use tokio_postgres::types::ToSql;
-use tokio_postgres::{self, Client, NoTls, Statement};
 
 mod models;
-mod server;
-use models::*;
+mod pg_conn;
+mod utils;
+use pg_conn::PgConnection;
 
 const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
-static CACHED_WORLDS: OnceCell<Vec<World>> = OnceCell::new();
-type DbResult<T> = Result<T, tokio_postgres::Error>;
-
-struct PgConnection {
-    client: Client,
-    fortune: Statement,
-    world: Statement,
-    updates: HashMap<u16, Statement>,
-}
-
-impl PgConnection {
-    pub async fn create(db_url: &str) -> Result<PgConnection, io::Error> {
-        let (client, conn) = tokio_postgres::connect(db_url, NoTls)
-            .await
-            .expect("can not connect to postgresql");
-        tokio::spawn(async move {
-            if let Err(e) = conn.await {
-                eprintln!("connection error: {}", e);
-            }
-        });
-
-        let fortune = client.prepare("SELECT id, message FROM fortune").await.unwrap();
-        let world = client.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
-        let mut updates = HashMap::new();
-        for num in 1..=500u16 {
-            let mut pl: u16 = 1;
-            let mut q = String::new();
-            q.push_str("UPDATE world SET randomnumber = CASE id ");
-            for _ in 1..=num {
-                let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
-                pl += 2;
-            }
-            q.push_str("ELSE randomnumber END WHERE id IN (");
-            for _ in 1..=num {
-                let _ = write!(&mut q, "${},", pl);
-                pl += 1;
-            }
-            q.pop();
-            q.push(')');
-            updates.insert(num, client.prepare(&q).await.unwrap());
-        }
-
-        Ok(PgConnection {
-            client,
-            fortune,
-            world,
-            updates,
-        })
-    }
-
-    async fn query_one_world(&self, w_id: i32) -> DbResult<World> {
-        self.client.query_one(&self.world, &[&w_id]).await.map(|row| World {
-            id: row.get(0),
-            randomnumber: row.get(1),
-        })
-    }
-
-    pub async fn get_world(&self) -> DbResult<World> {
-        let mut rng = SmallRng::from_entropy();
-        let id: i32 = rng.gen_range(1..10_001);
-        self.query_one_world(id).await
-    }
-    pub async fn get_worlds(&self, count: u16) -> DbResult<Vec<World>> {
-        let worlds = {
-            let mut rng = SmallRng::from_entropy();
-            let between = Uniform::from(1..10_001);
-            (0..count)
-                .map(|_| {
-                    let id: i32 = between.sample(&mut rng);
-                    self.query_one_world(id)
-                })
-                .collect::<FuturesUnordered<_>>()
-        };
-
-        worlds.try_collect().await
-    }
-
-    pub async fn update(&self, count: u16) -> DbResult<Vec<World>> {
-        let worlds = {
-            let mut rng = SmallRng::from_entropy();
-            let between = Uniform::from(1..10_001);
-            (0..count)
-                .map(|_| {
-                    let id: i32 = between.sample(&mut rng);
-                    let w_id: i32 = between.sample(&mut rng);
-                    async move {
-                        let mut world = self.query_one_world(w_id).await?;
-                        world.randomnumber = id;
-                        Ok(world)
-                    }
-                })
-                .collect::<FuturesUnordered<_>>()
-        };
-
-        let worlds = worlds.try_collect::<Vec<_>>().await?;
-        let mut params = Vec::<&(dyn ToSql + Sync)>::with_capacity(count as usize * 3);
-        for w in &worlds {
-            params.push(&w.id);
-            params.push(&w.randomnumber);
-        }
-        for w in &worlds {
-            params.push(&w.id);
-        }
-
-        let st = self.updates.get(&count).unwrap();
-        self.client.query(st, params.as_slice()).await?;
-        Ok(worlds)
-    }
 
-    pub async fn tell_fortune(&self) -> DbResult<FortunesTemplate> {
-        let mut items = self
-            .client
-            .query(&self.fortune, &[])
-            .await?
-            .iter()
-            .map(|row| Fortune {
-                id: row.get(0),
-                message: row.get(1),
-            })
-            .collect::<Vec<_>>();
-        items.push(Fortune {
-            id: 0,
-            message: "Additional fortune added at request time.".to_string(),
-        });
-        items.sort_by(|it, next| it.message.cmp(&next.message));
-        Ok(FortunesTemplate { items })
-    }
-}
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
 
-markup::define! {
-    FortunesTemplate(items: Vec<Fortune>) {
-        {markup::doctype()}
-        html {
-            head {
-                title { "Fortunes" }
-            }
-            body {
-                table {
-                    tr { th { "id" } th { "message" } }
-                    @for item in items {
-                        tr {
-                            td { {item.id} }
-                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
 struct WorldHandler {
     conn: PgConnection,
 }
@@ -185,16 +34,26 @@ impl WorldHandler {
     async fn new() -> Self {
         Self {
             conn: PgConnection::create(DB_URL)
-                .await.unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+                .await
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
         }
     }
 }
 #[async_trait]
 impl Handler for WorldHandler {
-    async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
-        res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
+    async fn handle(
+        &self,
+        _req: &mut Request,
+        _depot: &mut Depot,
+        res: &mut Response,
+        _ctrl: &mut FlowCtrl,
+    ) {
         let world = self.conn.get_world().await.unwrap();
-        res.render(Json(world));
+        let data = serde_json::to_vec(&world).unwrap();
+        let headers = res.headers_mut();
+        headers.insert(header::SERVER, SERVER_HEADER.clone());
+        headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+        res.set_body(ResBody::Once(Bytes::from(data)));
     }
 }
 struct WorldsHandler {
@@ -211,12 +70,22 @@ impl WorldsHandler {
 }
 #[async_trait]
 impl Handler for WorldsHandler {
-    async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
+    async fn handle(
+        &self,
+        req: &mut Request,
+        _depot: &mut Depot,
+        res: &mut Response,
+        _ctrl: &mut FlowCtrl,
+    ) {
         let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
-        res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
         let worlds = self.conn.get_worlds(count).await.unwrap();
-        res.render(Json(worlds));
+
+        let data = serde_json::to_vec(&worlds).unwrap();
+        let headers = res.headers_mut();
+        headers.insert(header::SERVER, SERVER_HEADER.clone());
+        headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+        res.set_body(ResBody::Once(Bytes::from(data)));
     }
 }
 struct UpdatesHandler {
@@ -233,12 +102,24 @@ impl UpdatesHandler {
 }
 #[async_trait]
 impl Handler for UpdatesHandler {
-    async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
+    async fn handle(
+        &self,
+        req: &mut Request,
+        _depot: &mut Depot,
+        res: &mut Response,
+        _ctrl: &mut FlowCtrl,
+    ) {
         let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
-        res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
+        res.headers_mut()
+            .insert(header::SERVER, SERVER_HEADER.clone());
         let worlds = self.conn.update(count).await.unwrap();
-        res.render(Json(worlds));
+
+        let data = serde_json::to_vec(&worlds).unwrap();
+        let headers = res.headers_mut();
+        headers.insert(header::SERVER, SERVER_HEADER.clone());
+        headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+        res.set_body(ResBody::Once(Bytes::from(data)));
     }
 }
 struct FortunesHandler {
@@ -255,48 +136,30 @@ impl FortunesHandler {
 }
 #[async_trait]
 impl Handler for FortunesHandler {
-    async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
-        let mut body = String::new();
-        write!(&mut body, "{}", self.conn.tell_fortune().await.unwrap()).unwrap();
-        res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-        res.render(Text::Html(body));
-    }
-}
+    async fn handle(
+        &self,
+        _req: &mut Request,
+        _depot: &mut Depot,
+        res: &mut Response,
+        _ctrl: &mut FlowCtrl,
+    ) {
+        let mut data = String::new();
+        write!(&mut data, "{}", self.conn.tell_fortune().await.unwrap()).unwrap();
 
-#[handler]
-async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.query::<usize>("q").unwrap_or(1);
-    let count = cmp::min(500, cmp::max(1, count));
-    let mut worlds = Vec::with_capacity(count);
-    let mut rng = SmallRng::from_entropy();
-    for _ in 0..count {
-        let idx = rng.gen_range(0..10_000);
-        unsafe {
-            let w = CACHED_WORLDS.get_unchecked().get(idx).unwrap();
-            worlds.push(w);
-        }
+        let headers = res.headers_mut();
+        headers.insert(header::SERVER, SERVER_HEADER.clone());
+        headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone());
+        res.set_body(ResBody::Once(Bytes::from(data)));
     }
-    res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.render(Json(worlds));
-    Ok(())
-}
-
-async fn populate_cache() -> Result<(), Error> {
-    let conn = PgConnection::create(DB_URL).await?;
-    let worlds = conn.get_worlds(10_000).await?;
-    CACHED_WORLDS.set(worlds).unwrap();
-    Ok(())
 }
 
 fn main() {
+    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
         .unwrap();
-    rt.block_on(async {
-        populate_cache().await.expect("error cache worlds");
-    });
-    for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
+    for _ in 1..size {
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()
                 .enable_all()
@@ -314,8 +177,7 @@ async fn serve() {
         .push(Router::with_path("db").get(WorldHandler::new().await))
         .push(Router::with_path("fortunes").get(FortunesHandler::new().await))
         .push(Router::with_path("queries").get(WorldsHandler::new().await))
-        .push(Router::with_path("cached_queries").get(cached_queries))
         .push(Router::with_path("updates").get(UpdatesHandler::new().await));
-
-    server::builder().serve(Service::new(router)).await.unwrap();
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
 }

+ 1 - 1
frameworks/Rust/salvo/src/models.rs

@@ -7,7 +7,7 @@ pub struct Message {
 }
 
 #[allow(non_snake_case)]
-#[derive(Serialize, Queryable, Debug)]
+#[derive(Serialize, Queryable, Clone, Debug)]
 pub struct World {
     pub id: i32,
     pub randomnumber: i32,

+ 167 - 0
frameworks/Rust/salvo/src/pg_conn.rs

@@ -0,0 +1,167 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::io;
+
+use futures_util::stream::{FuturesUnordered, TryStreamExt};
+use rand::distributions::{Distribution, Uniform};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use tokio_postgres::types::ToSql;
+use tokio_postgres::{self, Client, NoTls, Statement};
+
+use crate::models::*;
+
+type DbResult<T> = Result<T, tokio_postgres::Error>;
+
+pub struct PgConnection {
+    client: Client,
+    #[allow(dead_code)]
+    fortune: Statement,
+    world: Statement,
+    #[allow(dead_code)]
+    updates: HashMap<u16, Statement>,
+}
+
+impl PgConnection {
+    pub async fn create(db_url: &str) -> Result<PgConnection, io::Error> {
+        let (client, conn) = tokio_postgres::connect(db_url, NoTls)
+            .await
+            .expect("can not connect to postgresql");
+        tokio::spawn(async move {
+            if let Err(e) = conn.await {
+                eprintln!("connection error: {}", e);
+            }
+        });
+
+        let fortune = client.prepare("SELECT id, message FROM fortune").await.unwrap();
+        let world = client.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
+        let mut updates = HashMap::new();
+        for num in 1..=500u16 {
+            let mut pl: u16 = 1;
+            let mut q = String::new();
+            q.push_str("UPDATE world SET randomnumber = CASE id ");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
+                pl += 2;
+            }
+            q.push_str("ELSE randomnumber END WHERE id IN (");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "${},", pl);
+                pl += 1;
+            }
+            q.pop();
+            q.push(')');
+            updates.insert(num, client.prepare(&q).await.unwrap());
+        }
+
+        Ok(PgConnection {
+            client,
+            fortune,
+            world,
+            updates,
+        })
+    }
+
+    async fn query_one_world(&self, w_id: i32) -> DbResult<World> {
+        self.client.query_one(&self.world, &[&w_id]).await.map(|row| World {
+            id: row.get(0),
+            randomnumber: row.get(1),
+        })
+    }
+
+    #[allow(dead_code)]
+    pub async fn get_world(&self) -> DbResult<World> {
+        let mut rng = SmallRng::from_entropy();
+        let id: i32 = rng.gen_range(1..10_001);
+        self.query_one_world(id).await
+    }
+    pub async fn get_worlds(&self, count: u16) -> DbResult<Vec<World>> {
+        let worlds = {
+            let mut rng = SmallRng::from_entropy();
+            let between = Uniform::from(1..10_001);
+            (0..count)
+                .map(|_| {
+                    let id = between.sample(&mut rng);
+                    self.query_one_world(id)
+                })
+                .collect::<FuturesUnordered<_>>()
+        };
+
+        worlds.try_collect().await
+    }
+
+    #[allow(dead_code)]
+    pub async fn update(&self, count: u16) -> DbResult<Vec<World>> {
+        let worlds = {
+            let mut rng = SmallRng::from_entropy();
+            let between = Uniform::from(1..10_001);
+            (0..count)
+                .map(|_| {
+                    let id: i32 = between.sample(&mut rng);
+                    let w_id: i32 = between.sample(&mut rng);
+                    async move {
+                        let mut world = self.query_one_world(w_id).await?;
+                        world.randomnumber = id;
+                        Ok(world)
+                    }
+                })
+                .collect::<FuturesUnordered<_>>()
+        };
+
+        let worlds = worlds.try_collect::<Vec<_>>().await?;
+        let mut params = Vec::<&(dyn ToSql + Sync)>::with_capacity(count as usize * 3);
+        for w in &worlds {
+            params.push(&w.id);
+            params.push(&w.randomnumber);
+        }
+        for w in &worlds {
+            params.push(&w.id);
+        }
+
+        let st = self.updates.get(&count).unwrap();
+        self.client.query(st, params.as_slice()).await?;
+        Ok(worlds)
+    }
+
+    #[allow(dead_code)]
+    pub async fn tell_fortune(&self) -> DbResult<FortunesTemplate> {
+        let mut items = self
+            .client
+            .query(&self.fortune, &[])
+            .await?
+            .iter()
+            .map(|row| Fortune {
+                id: row.get(0),
+                message: row.get(1),
+            })
+            .collect::<Vec<_>>();
+        items.push(Fortune {
+            id: 0,
+            message: "Additional fortune added at request time.".to_string(),
+        });
+        items.sort_by(|it, next| it.message.cmp(&next.message));
+        Ok(FortunesTemplate { items })
+    }
+}
+
+markup::define! {
+    FortunesTemplate(items: Vec<Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in items {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

+ 0 - 33
frameworks/Rust/salvo/src/server.rs

@@ -1,33 +0,0 @@
-use std::io;
-use std::net::{Ipv4Addr, SocketAddr};
-
-use salvo::hyper;
-use salvo::hyper::server::conn::AddrIncoming;
-use tokio::net::{TcpListener, TcpSocket};
-
-pub fn builder() -> hyper::server::Builder<AddrIncoming> {
-    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080));
-    let listener = reuse_listener(addr).expect("couldn't bind to addr");
-    let incoming = AddrIncoming::from_listener(listener).unwrap();
-    hyper::Server::builder(incoming)
-        .http1_only(true)
-        .tcp_nodelay(true)
-}
-
-fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
-    let socket = match addr {
-        SocketAddr::V4(_) => TcpSocket::new_v4()?,
-        SocketAddr::V6(_) => TcpSocket::new_v6()?,
-    };
-
-    #[cfg(unix)]
-    {
-        if let Err(e) = socket.set_reuseport(true) {
-            eprintln!("error setting SO_REUSEPORT: {}", e);
-        }
-    }
-
-    socket.set_reuseaddr(true)?;
-    socket.bind(addr)?;
-    socket.listen(1024)
-}

+ 24 - 0
frameworks/Rust/salvo/src/utils.rs

@@ -0,0 +1,24 @@
+use std::io;
+use std::net::{Ipv4Addr, SocketAddr};
+
+use tokio::net::{TcpListener, TcpSocket};
+
+#[allow(dead_code)]
+pub fn reuse_listener() -> io::Result<TcpListener> {
+    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080));
+    let socket = match addr {
+        SocketAddr::V4(_) => TcpSocket::new_v4()?,
+        SocketAddr::V6(_) => TcpSocket::new_v6()?,
+    };
+
+    #[cfg(unix)]
+    {
+        if let Err(e) = socket.set_reuseport(true) {
+            eprintln!("error setting SO_REUSEPORT: {e}");
+        }
+    }
+
+    socket.set_reuseaddr(true)?;
+    socket.bind(addr)?;
+    socket.listen(1024)
+}