فهرست منبع

Salvo add mongodb and sqlx (#8126)

* Enable pipeline_flush

* Add mongodb and sqlx

* Add mongodb and sqlx test

* fix main-pg test

* Add max pool and min pool env var

* Fix ci error

* fix typo

* Remove mimalloc
Chrislearn Young 2 سال پیش
والد
کامیت
4d90adbd54
29فایلهای تغییر یافته به همراه1231 افزوده شده و 98 حذف شده
  1. 2 0
      frameworks/Rust/salvo/.gitignore
  2. 25 2
      frameworks/Rust/salvo/Cargo.toml
  3. 87 10
      frameworks/Rust/salvo/benchmark_config.json
  4. 3 0
      frameworks/Rust/salvo/salvo-diesel.dockerfile
  5. 2 0
      frameworks/Rust/salvo/salvo-moka.dockerfile
  6. 15 0
      frameworks/Rust/salvo/salvo-mongo-raw.dockerfile
  7. 15 0
      frameworks/Rust/salvo/salvo-mongo.dockerfile
  8. 14 0
      frameworks/Rust/salvo/salvo-pg-pool.dockerfile
  9. 2 0
      frameworks/Rust/salvo/salvo-pg.dockerfile
  10. 15 0
      frameworks/Rust/salvo/salvo-sqlx.dockerfile
  11. 52 0
      frameworks/Rust/salvo/src/db_mongo.rs
  12. 62 0
      frameworks/Rust/salvo/src/db_mongo_raw.rs
  13. 1 1
      frameworks/Rust/salvo/src/db_pg.rs
  14. 85 0
      frameworks/Rust/salvo/src/db_pg_pool.rs
  15. 35 0
      frameworks/Rust/salvo/src/db_sqlx.rs
  16. 8 4
      frameworks/Rust/salvo/src/main.rs
  17. 16 12
      frameworks/Rust/salvo/src/main_diesel.rs
  18. 14 15
      frameworks/Rust/salvo/src/main_moka.rs
  19. 189 0
      frameworks/Rust/salvo/src/main_mongo.rs
  20. 153 0
      frameworks/Rust/salvo/src/main_mongo_raw.rs
  21. 27 48
      frameworks/Rust/salvo/src/main_pg.rs
  22. 197 0
      frameworks/Rust/salvo/src/main_pg_pool.rs
  23. 135 0
      frameworks/Rust/salvo/src/main_sqlx.rs
  24. 1 6
      frameworks/Rust/salvo/src/models_diesel.rs
  25. 14 0
      frameworks/Rust/salvo/src/models_mongo.rs
  26. 15 0
      frameworks/Rust/salvo/src/models_pg.rs
  27. 19 0
      frameworks/Rust/salvo/src/models_pg_pool.rs
  28. 16 0
      frameworks/Rust/salvo/src/models_sqlx.rs
  29. 12 0
      frameworks/Rust/salvo/src/utils.rs

+ 2 - 0
frameworks/Rust/salvo/.gitignore

@@ -1,2 +1,4 @@
+.env
+.vscode
 target
 Cargo.lock

+ 25 - 2
frameworks/Rust/salvo/Cargo.toml

@@ -15,6 +15,22 @@ path = "src/main_diesel.rs"
 name = "main-pg"
 path = "src/main_pg.rs"
 
+[[bin]]
+name = "main-pg-pool"
+path = "src/main_pg_pool.rs"
+
+[[bin]]
+name = "main-sqlx"
+path = "src/main_sqlx.rs"
+
+[[bin]]
+name = "main-mongo"
+path = "src/main_mongo.rs"
+
+[[bin]]
+name = "main-mongo-raw"
+path = "src/main_mongo_raw.rs"
+
 [[bin]]
 name = "main-moka"
 path = "src/main_moka.rs"
@@ -24,20 +40,27 @@ anyhow = "1"
 async-trait = "0.1"
 bytes = "1"
 diesel = { version = "2", features = ["postgres", "r2d2"] }
+deadpool = { version = "0.9", features = ["rt_tokio_1", "serde", "async-trait", "managed"] }
+deadpool-postgres = "0.10"
 futures-util = "0.3"
 moka = "0.10"
 markup = "0.13"
-mimalloc = { version = "0.1", default-features = false }
+# mimalloc = { version = "0.1", default-features = false }
+mongodb = { version = "2.4.0", features = ["zstd-compression", "snappy-compression", "zlib-compression"] }
 once_cell = "1"
 rand = { version = "0.8", features = ["min_const_gen", "small_rng"] }
-salvo = { version = "0.38", default-features = false, features = ["anyhow", "http1"] }
+salvo = { version = "0.38", default-features = false, features = ["anyhow", "http1", "affix"] }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
 # smallvec = "1"
 # snmalloc-rs = { version = "0.3", features = ["native-cpu"] }
+sqlx = { version = "0.6.2", features = ["postgres", "macros", "runtime-tokio-native-tls"] }
 tokio = { version = "1", features = ["macros", "rt"] }
+tokio-pg-mapper = "0.2.0"
+tokio-pg-mapper-derive = "0.2.0"
 tokio-postgres = "0.7"
 v_htmlescape = "0.15"
+dotenv = "0.15.0"
 
 [profile.release]
 lto = true

+ 87 - 10
frameworks/Rust/salvo/benchmark_config.json

@@ -8,8 +8,8 @@
         "port": 8080,
         "approach": "Realistic",
         "classification": "Micro",
-        "database": "postgres",
-        "framework": "salvo",
+        "database": "none",
+        "framework": "Salvo",
         "language": "Rust",
         "flavor": "None",
         "orm": "Micro",
@@ -30,18 +30,38 @@
         "approach": "Realistic",
         "classification": "Micro",
         "database": "Postgres",
-        "framework": "salvo",
+        "framework": "Salvo",
         "language": "Rust",
         "orm": "Raw",
         "platform": "Rust",
         "webserver": "Hyper",
         "os": "Linux",
         "database_os": "Linux",
-        "display_name": "Salvo [diesel]",
+        "display_name": "Salvo [postgres-diesel]",
         "notes": "",
-        "versus": ""
+        "versus": "None"
       },
       "pg": {
+        "db_url": "/db",
+        "fortune_url": "/fortunes",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "postgres",
+        "framework": "Salvo",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Salvo [postgres]",
+        "notes": "",
+        "versus": "None"
+      },
+      "pg-pool": {
         "db_url": "/db",
         "fortune_url": "/fortunes",
         "query_url": "/queries?q=",
@@ -50,16 +70,73 @@
         "approach": "Realistic",
         "classification": "Micro",
         "database": "Postgres",
-        "framework": "salvo",
+        "framework": "Salvo",
         "language": "Rust",
         "orm": "Raw",
         "platform": "Rust",
         "webserver": "Hyper",
         "os": "Linux",
         "database_os": "Linux",
-        "display_name": "Salvo [pg]",
+        "display_name": "Salvo [postgres-deadpool]",
         "notes": "",
-        "versus": ""
+        "versus": "None"
+      },
+      "mongo": {
+        "db_url": "/db",
+        "fortune_url": "/fortunes",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Mongodb",
+        "framework": "Salvo",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Salvo [mongodb]",
+        "notes": "",
+        "versus": "None"
+      },
+      "mongo-raw": {
+        "db_url": "/db",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Mongodb",
+        "framework": "Salvo",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Salvo [mongodb-raw]",
+        "notes": "",
+        "versus": "None"
+      },
+      "sqlx": {
+        "db_url": "/db",
+        "fortune_url": "/fortunes",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "Salvo",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "Rust",
+        "webserver": "Hyper",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Salvo [postgres-sqlx]",
+        "notes": "",
+        "versus": "None"
       },
       "moka": {
         "cached_query_url": "/cached_queries?q=",
@@ -67,7 +144,7 @@
         "approach": "Realistic",
         "classification": "Micro",
         "database": "Postgres",
-        "framework": "salvo",
+        "framework": "Salvo",
         "language": "Rust",
         "orm": "Raw",
         "platform": "Rust",
@@ -76,7 +153,7 @@
         "database_os": "Linux",
         "display_name": "Salvo [moka]",
         "notes": "",
-        "versus": ""
+        "versus": "None"
       }
     }
   ]

+ 3 - 0
frameworks/Rust/salvo/salvo-diesel.dockerfile

@@ -1,5 +1,8 @@
 FROM rust:1.68.2
 
+ENV TECHEMPOWER_POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+ENV TECHEMPOWER_MAX_POOL_SIZE=28
+
 ADD ./ /salvo
 WORKDIR /salvo
 

+ 2 - 0
frameworks/Rust/salvo/salvo-moka.dockerfile

@@ -1,5 +1,7 @@
 FROM rust:1.68.2
 
+ENV TECHEMPOWER_POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+
 ADD ./ /salvo
 WORKDIR /salvo
 

+ 15 - 0
frameworks/Rust/salvo/salvo-mongo-raw.dockerfile

@@ -0,0 +1,15 @@
+FROM rust:1.68.2
+
+ENV TECHEMPOWER_MONGODB_URL=mongodb://tfb-database:27017
+ENV TECHEMPOWER_MAX_POOL_SIZE=28
+ENV TECHEMPOWER_MIN_POOL_SIZE=14
+
+ADD ./ /salvo
+WORKDIR /salvo
+
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
+
+EXPOSE 8080
+
+CMD ./target/release/main-mongo-raw

+ 15 - 0
frameworks/Rust/salvo/salvo-mongo.dockerfile

@@ -0,0 +1,15 @@
+FROM rust:1.68.2
+
+ENV TECHEMPOWER_MONGODB_URL=mongodb://tfb-database:27017
+ENV TECHEMPOWER_MAX_POOL_SIZE=28
+ENV TECHEMPOWER_MIN_POOL_SIZE=14
+
+ADD ./ /salvo
+WORKDIR /salvo
+
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
+
+EXPOSE 8080
+
+CMD ./target/release/main-mongo

+ 14 - 0
frameworks/Rust/salvo/salvo-pg-pool.dockerfile

@@ -0,0 +1,14 @@
+FROM rust:1.68.2
+
+ENV TECHEMPOWER_POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+ENV TECHEMPOWER_MAX_POOL_SIZE=28
+
+ADD ./ /salvo
+WORKDIR /salvo
+
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
+
+EXPOSE 8080
+
+CMD ./target/release/main-pg-pool

+ 2 - 0
frameworks/Rust/salvo/salvo-pg.dockerfile

@@ -1,5 +1,7 @@
 FROM rust:1.68.2
 
+ENV TECHEMPOWER_POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+
 ADD ./ /salvo
 WORKDIR /salvo
 

+ 15 - 0
frameworks/Rust/salvo/salvo-sqlx.dockerfile

@@ -0,0 +1,15 @@
+FROM rust:1.68.2
+
+ENV TECHEMPOWER_POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+ENV TECHEMPOWER_MAX_POOL_SIZE=56
+ENV TECHEMPOWER_MIN_POOL_SIZE=56
+
+ADD ./ /salvo
+WORKDIR /salvo
+
+ENV RUSTFLAGS "-C target-cpu=native"
+RUN cargo build --release
+
+EXPOSE 8080
+
+CMD ./target/release/main-sqlx

+ 52 - 0
frameworks/Rust/salvo/src/db_mongo.rs

@@ -0,0 +1,52 @@
+use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt};
+use mongodb::{bson::doc, error::Error, Database};
+
+use crate::{Fortune, World};
+
+pub async fn find_world_by_id(db: Database, id: i32) -> Result<World, Error> {
+    let world_collection = db.collection::<World>("world");
+
+    let filter = doc! { "_id": id as f32 };
+    let world: World = world_collection
+        .find_one(Some(filter), None)
+        .await?
+        .expect("expected world, found none");
+    Ok(world)
+}
+
+pub async fn find_worlds(db: Database, ids: Vec<i32>) -> Result<Vec<World>, Error> {
+    let future_worlds = FuturesUnordered::new();
+    for id in ids {
+        future_worlds.push(find_world_by_id(db.clone(), id));
+    }
+    future_worlds.try_collect().await
+}
+
+pub async fn fetch_fortunes(db: Database) -> Result<Vec<Fortune>, Error> {
+    let fortune_collection = db.collection::<Fortune>("fortune");
+    let mut fortune_cursor = fortune_collection.find(None, None).await?;
+    let mut fortunes: Vec<Fortune> = Vec::new();
+    while let Some(doc) = fortune_cursor.next().await {
+        fortunes.push(doc.expect("could not load fortune"));
+    }
+    fortunes.push(Fortune {
+        id: 0,
+        message: "Additional fortune added at request time.".to_string(),
+    });
+    fortunes.sort_by(|a, b| a.message.cmp(&b.message));
+    Ok(fortunes)
+}
+
+pub async fn update_worlds(db: Database, worlds: Vec<World>) -> Result<bool, Error> {
+    let mut updates = Vec::new();
+    for world in worlds {
+        updates.push(doc! {
+        "q": { "id": world.id }, "u": { "$set": { "randomNumber": world.random_number }}
+        });
+    }
+    db.run_command(doc! {"update": "world", "updates": updates, "ordered": false}, None)
+        .await
+        .expect("could not update worlds");
+
+    Ok(true)
+}

+ 62 - 0
frameworks/Rust/salvo/src/db_mongo_raw.rs

@@ -0,0 +1,62 @@
+use futures_util::{stream::FuturesUnordered, TryStreamExt};
+use mongodb::{
+    bson::{doc, RawDocumentBuf},
+    error::Error,
+    Database,
+};
+
+use crate::World;
+
+pub async fn find_world_by_id(db: Database, id: i32) -> Result<World, Error> {
+    let world_collection = db.collection::<RawDocumentBuf>("world");
+
+    let filter = doc! { "_id": id as f32 };
+
+    let raw: RawDocumentBuf = world_collection
+        .find_one(Some(filter), None)
+        .await
+        .unwrap()
+        .expect("expected world, found none");
+
+    Ok(World {
+        id: raw
+            .get("id")
+            .expect("expected to parse world id")
+            .expect("could not get world id")
+            .as_i32()
+            .expect("could not extract world id"),
+        random_number: raw
+            .get("id")
+            .expect("expected to parse world id")
+            .expect("could not get world id")
+            .as_i32()
+            .expect("could not extract world id"),
+    })
+}
+
+pub async fn find_worlds(db: Database, ids: Vec<i32>) -> Result<Vec<World>, Error> {
+    let future_worlds = FuturesUnordered::new();
+
+    for id in ids {
+        future_worlds.push(find_world_by_id(db.clone(), id));
+    }
+
+    let worlds: Result<Vec<World>, Error> = future_worlds.try_collect().await;
+    worlds
+}
+
+pub async fn update_worlds(db: Database, worlds: Vec<World>) -> Result<bool, Error> {
+    let mut updates = Vec::new();
+
+    for world in worlds {
+        updates.push(doc! {
+        "q": { "id": world.id }, "u": { "$set": { "randomNumber": world.random_number }}
+        });
+    }
+
+    db.run_command(doc! {"update": "world", "updates": updates, "ordered": false}, None)
+        .await
+        .expect("could not update worlds");
+
+    Ok(true)
+}

+ 1 - 1
frameworks/Rust/salvo/src/pg_conn.rs → frameworks/Rust/salvo/src/db_pg.rs

@@ -9,7 +9,7 @@ use rand::{Rng, SeedableRng};
 use tokio_postgres::types::ToSql;
 use tokio_postgres::{self, Client, NoTls, Statement};
 
-use crate::models::*;
+use crate::models_pg::*;
 
 type DbResult<T> = Result<T, tokio_postgres::Error>;
 

+ 85 - 0
frameworks/Rust/salvo/src/db_pg_pool.rs

@@ -0,0 +1,85 @@
+use deadpool_postgres::{Client, Manager, ManagerConfig, RecyclingMethod};
+use tokio_pg_mapper::FromTokioPostgresRow;
+use tokio_postgres::{NoTls, Row, Error, Statement};
+
+use crate::{Fortune, World};
+
+
+pub async fn create_pool(database_url: String, max_pool_size: u32) -> deadpool_postgres::Pool {
+    let pg_config: tokio_postgres::Config = database_url.parse().expect("invalid database url");
+
+    let mgr_config = ManagerConfig {
+        recycling_method: RecyclingMethod::Fast,
+    };
+    let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
+    let pool: deadpool_postgres::Pool = deadpool_postgres::Pool::builder(mgr)
+        .max_size(max_pool_size as usize)
+        .build()
+        .unwrap();
+
+    pool
+}
+
+pub async fn fetch_world_by_id(client: &Client, number: i32, select: &Statement) -> Result<World, Error> {
+    let row: Row = client.query_one(select, &[&number]).await.unwrap();
+
+    Ok(World::from_row(row).unwrap())
+}
+
+pub async fn update_world(client: &Client, update: &Statement, random_id: i32, w_id: i32) -> Result<u64, Error> {
+    let rows_modified: u64 = client.execute(update, &[&random_id, &w_id]).await.unwrap();
+
+    Ok(rows_modified)
+}
+
+pub async fn fetch_all_fortunes(client: Client, select: &Statement) -> Result<Vec<Fortune>, Error> {
+    let rows: Vec<Row> = client.query(select, &[]).await.unwrap();
+
+    let mut fortunes: Vec<Fortune> = Vec::with_capacity(rows.capacity());
+
+    for row in rows {
+        fortunes.push(Fortune::from_row(row).unwrap());
+    }
+
+    Ok(fortunes)
+}
+
+pub async fn prepare_fetch_all_fortunes_statement(client: &Client) -> Statement {
+    client.prepare_cached("SELECT * FROM Fortune").await.unwrap()
+}
+
+pub async fn prepare_fetch_world_by_id_statement(client: &Client) -> Statement {
+    client
+        .prepare_cached("SELECT id, randomnumber FROM World WHERE id = $1")
+        .await
+        .unwrap()
+}
+
+pub async fn prepare_update_world_by_id_statement(client: &Client) -> Statement {
+    client
+        .prepare_cached("UPDATE World SET randomnumber = $1 WHERE id = $2")
+        .await
+        .unwrap()
+}
+
+markup::define! {
+    FortunesTemplate(items: Vec<Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in items {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

+ 35 - 0
frameworks/Rust/salvo/src/db_sqlx.rs

@@ -0,0 +1,35 @@
+use sqlx::{
+    pool::PoolConnection,
+    postgres::{PgArguments, PgPoolOptions},
+    Arguments, PgPool, Postgres, Error
+};
+
+use crate::{Fortune, World};
+
+pub async fn create_pool(database_url: String, max_pool_size: u32, min_pool_size: u32) -> PgPool {
+    PgPoolOptions::new()
+        .max_connections(max_pool_size)
+        .min_connections(min_pool_size)
+        .connect(&database_url)
+        .await
+        .unwrap()
+}
+
+pub async fn fetch_world(mut conn: PoolConnection<Postgres>, number: i32) -> Result<World, Error> {
+    let mut args = PgArguments::default();
+    args.add(number);
+
+    let world: World = sqlx::query_as_with("SELECT id, randomnumber FROM World WHERE id = $1", args)
+        .fetch_one(&mut conn)
+        .await
+        .expect("error loading world");
+    Ok(world)
+}
+
+pub async fn fetch_fortunes(mut conn: PoolConnection<Postgres>) -> Result<Vec<Fortune>, Error> {
+    let fortunes: Vec<Fortune> = sqlx::query_as("SELECT * FROM Fortune")
+        .fetch_all(&mut conn)
+        .await
+        .expect("error loading Fortunes");
+    Ok(fortunes)
+}

+ 8 - 4
frameworks/Rust/salvo/src/main.rs

@@ -1,5 +1,5 @@
-#[global_allocator]
-static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::sync::Arc;
 use std::thread::available_parallelism;
@@ -16,6 +16,7 @@ mod utils;
 static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
 static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
 static PLAIN_HEADER: HeaderValue = HeaderValue::from_static("text/plain");
+static HELLO_WORD: Bytes = Bytes::from_static(b"Hello, world!");
 
 #[derive(Serialize)]
 pub struct Message {
@@ -39,7 +40,7 @@ fn plaintext(res: &mut Response) {
     let headers = res.headers_mut();
     headers.insert(header::SERVER, SERVER_HEADER.clone());
     headers.insert(header::CONTENT_TYPE, PLAIN_HEADER.clone());
-    res.set_body(ResBody::Once(Bytes::from_static(b"Hello, world!")));
+    res.set_body(ResBody::Once(HELLO_WORD.clone()));
 }
 
 fn main() {
@@ -70,5 +71,8 @@ fn main() {
 
 async fn serve(router: Arc<Router>) {
     let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
-    Server::new(acceptor).serve(router).await
+    let mut server = Server::new(acceptor);
+    let http1 = server.http1_mut();
+    http1.pipeline_flush(true);
+    server.serve(router).await
 }

+ 16 - 12
frameworks/Rust/salvo/src/main_diesel.rs

@@ -1,7 +1,7 @@
 // #[global_allocator]
 // static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-#[global_allocator]
-static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[macro_use]
 extern crate diesel;
@@ -22,14 +22,14 @@ use salvo::conn::tcp::TcpAcceptor;
 use salvo::http::header::{self, HeaderValue};
 use salvo::http::ResBody;
 use salvo::prelude::*;
+use dotenv::dotenv;
 
-mod models;
+mod models_diesel;
 mod schema;
 mod utils;
-use models::*;
+use models_diesel::*;
 use schema::*;
 
-const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 type PgPool = Pool<ConnectionManager<PgConnection>>;
 
 static DB_POOL: OnceCell<PgPool> = OnceCell::new();
@@ -40,7 +40,7 @@ static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=u
 fn connect() -> Result<PooledConnection<ConnectionManager<PgConnection>>, PoolError> {
     unsafe { DB_POOL.get_unchecked().get() }
 }
-fn build_pool(database_url: &str, size: u32) -> Result<PgPool, PoolError> {
+fn create_pool(database_url: &str, size: u32) -> Result<PgPool, PoolError> {
     let manager = ConnectionManager::<PgConnection>::new(database_url);
     diesel::r2d2::Pool::builder()
         .max_size(size)
@@ -79,7 +79,7 @@ async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
         worlds.push(w);
     }
 
-    let data = serde_json::to_vec(&worlds).unwrap();
+    let data = serde_json::to_vec(&worlds)?;
     let headers = res.headers_mut();
     headers.insert(header::SERVER, SERVER_HEADER.clone());
     headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
@@ -111,7 +111,7 @@ async fn updates(req: &mut Request, res: &mut Response) -> Result<(), Error> {
         Ok(())
     })?;
 
-    let data = serde_json::to_vec(&worlds).unwrap();
+    let data = serde_json::to_vec(&worlds)?;
     let headers = res.headers_mut();
     headers.insert(header::SERVER, SERVER_HEADER.clone());
     headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
@@ -162,11 +162,14 @@ markup::define! {
 }
 
 fn main() {
-    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    dotenv().ok();
+    
+    let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
+    let max_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MAX_POOL_SIZE");
     DB_POOL
         .set(
-            build_pool(DB_URL, size as u32)
-                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+            create_pool(&db_url, max_pool_size)
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &db_url)),
         )
         .ok();
 
@@ -177,11 +180,12 @@ fn main() {
             .push(Router::with_path("queries").get(queries))
             .push(Router::with_path("updates").get(updates)),
     );
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
         .unwrap();
-    for _ in 1..size {
+    for _ in 1..thread_count {
         let router = router.clone();
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()

+ 14 - 15
frameworks/Rust/salvo/src/main_moka.rs

@@ -1,7 +1,7 @@
 // #[global_allocator]
 // static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-#[global_allocator]
-static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::cmp;
 use std::sync::Arc;
@@ -17,14 +17,14 @@ use salvo::conn::tcp::TcpAcceptor;
 use salvo::http::header::{self, HeaderValue};
 use salvo::http::ResBody;
 use salvo::prelude::*;
+use dotenv::dotenv;
 
-mod models;
+mod models_pg;
 mod utils;
-use models::*;
-mod pg_conn;
-use pg_conn::PgConnection;
+use models_pg::*;
+mod db_pg;
+use db_pg::PgConnection;
 
-const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 static CACHED_WORLDS: OnceCell<MokaCache<usize, World>> = OnceCell::new();
 
 static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
@@ -43,7 +43,7 @@ fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
             worlds.push(w);
         }
     }
-    let data = serde_json::to_vec(&worlds).unwrap();
+    let data = serde_json::to_vec(&worlds)?;
     let headers = res.headers_mut();
     headers.insert(header::SERVER, SERVER_HEADER.clone());
     headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
@@ -52,7 +52,8 @@ fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
 }
 
 async fn populate_cache() -> Result<(), Error> {
-    let conn = PgConnection::create(DB_URL).await?;
+    let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
+    let conn = PgConnection::create(&db_url).await?;
     let worlds = conn.get_worlds(10_000).await?;
     let cache = MokaCache::new(10_000);
     for (i, word) in worlds.into_iter().enumerate() {
@@ -63,7 +64,8 @@ async fn populate_cache() -> Result<(), Error> {
 }
 
 fn main() {
-    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    dotenv().ok();
+    
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
@@ -73,11 +75,8 @@ fn main() {
     });
 
     let router = Arc::new(Router::with_path("cached_queries").get(cached_queries));
-    let rt = tokio::runtime::Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap();
-    for _ in 1..size{
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    for _ in 1..thread_count{
         let router = router.clone();
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()

+ 189 - 0
frameworks/Rust/salvo/src/main_mongo.rs

@@ -0,0 +1,189 @@
+// #[global_allocator]
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use std::cmp;
+use std::fmt::Write;
+use std::thread::available_parallelism;
+use std::time::Duration;
+
+use anyhow::Error;
+use bytes::Bytes;
+use dotenv::dotenv;
+use mongodb::{
+    options::{ClientOptions, Compressor},
+    Client, Database,
+};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
+use salvo::prelude::*;
+
+mod db_mongo;
+mod models_mongo;
+mod utils;
+
+use db_mongo::*;
+use models_mongo::*;
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
+
+#[handler]
+async fn world_row(res: &mut Response, depot: &mut Depot) -> Result<(), Error> {
+    let mut rng = SmallRng::from_entropy();
+    let random_id = rng.gen_range(1..10_001);
+
+    let db = depot.obtain::<Database>().unwrap();
+    let world = find_world_by_id(db.clone(), random_id).await?;
+
+    let data = serde_json::to_vec(&world).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn queries(req: &mut Request, depot: &mut Depot, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+    let mut ids: Vec<i32> = Vec::with_capacity(count as usize);
+    for _ in 0..count {
+        ids.push(rng.gen_range(1..10_001));
+    }
+    let db = depot.obtain::<Database>().unwrap();
+    let worlds = find_worlds(db.clone(), ids).await?;
+
+    let data = serde_json::to_vec(&worlds)?;
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn updates(req: &mut Request, depot: &mut Depot, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+    let mut ids: Vec<i32> = Vec::with_capacity(count as usize);
+    for _ in 0..count {
+        ids.push(rng.gen_range(1..10_001));
+    }
+
+    let db = depot.obtain::<Database>().unwrap();
+    let mut worlds = find_worlds(db.clone(), ids).await?;
+    for world in &mut worlds {
+        world.random_number = rng.gen_range(1..10_001);
+    }
+    let data = serde_json::to_vec(&worlds)?;
+    update_worlds(db.clone(), worlds).await?;
+
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn fortunes(res: &mut Response, depot: &mut Depot) -> Result<(), Error> {
+    let db = depot.obtain::<Database>().unwrap();
+    let items = fetch_fortunes(db.clone()).await?;
+
+    let mut data = String::new();
+    write!(&mut data, "{}", FortunesTemplate { items })?;
+
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+markup::define! {
+    FortunesTemplate(items: Vec<Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in items {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+fn main() {
+    dotenv().ok();
+
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    for _ in 1..thread_count {
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve());
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve());
+}
+
+async fn serve() {
+    let db_url: String = utils::get_env_var("TECHEMPOWER_MONGODB_URL");
+    let max_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MAX_POOL_SIZE");
+    let min_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MIN_POOL_SIZE");
+    let mut client_options = ClientOptions::parse(db_url).await.unwrap();
+    client_options.max_pool_size = Some(max_pool_size);
+    client_options.min_pool_size = Some(min_pool_size);
+    client_options.connect_timeout = Some(Duration::from_millis(200));
+
+    // the server will select the algorithm it supports from the list provided by the driver
+    client_options.compressors = Some(vec![
+        Compressor::Snappy,
+        Compressor::Zlib {
+            level: Default::default(),
+        },
+        Compressor::Zstd {
+            level: Default::default(),
+        },
+    ]);
+
+    let client = Client::with_options(client_options).unwrap();
+    let database = client.database("hello_world");
+
+    let router = Router::new()
+        .hoop(salvo::affix::inject(database))
+        .push(Router::with_path("db").get(world_row))
+        .push(Router::with_path("fortunes").get(fortunes))
+        .push(Router::with_path("queries").get(queries))
+        .push(Router::with_path("updates").get(updates));
+
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
+}

+ 153 - 0
frameworks/Rust/salvo/src/main_mongo_raw.rs

@@ -0,0 +1,153 @@
+// #[global_allocator]
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use std::cmp;
+use std::thread::available_parallelism;
+use std::time::Duration;
+
+use anyhow::Error;
+use bytes::Bytes;
+use dotenv::dotenv;
+use mongodb::{
+    options::{ClientOptions, Compressor},
+    Client, Database,
+};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
+use salvo::prelude::*;
+
+mod db_mongo_raw;
+mod models_mongo;
+mod utils;
+
+use db_mongo_raw::*;
+use models_mongo::*;
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+
+#[handler]
+async fn world_row(res: &mut Response, depot: &mut Depot) -> Result<(), Error> {
+    let mut rng = SmallRng::from_entropy();
+    let random_id = rng.gen_range(1..10_001);
+
+    let db = depot.obtain::<Database>().unwrap();
+    let world = find_world_by_id(db.clone(), random_id).await?;
+
+    let data = serde_json::to_vec(&world).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn queries(req: &mut Request, depot: &mut Depot, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+    let mut ids: Vec<i32> = Vec::with_capacity(count as usize);
+    for _ in 0..count {
+        ids.push(rng.gen_range(1..10_001));
+    }
+    let db = depot.obtain::<Database>().unwrap();
+    let worlds = find_worlds(db.clone(), ids).await?;
+
+    let data = serde_json::to_vec(&worlds)?;
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn updates(req: &mut Request, depot: &mut Depot, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+
+    let mut ids: Vec<i32> = Vec::with_capacity(count as usize);
+    for _ in 0..count {
+        ids.push(rng.gen_range(1..10_001));
+    }
+
+    let db = depot.obtain::<Database>().unwrap();
+    let mut worlds = find_worlds(db.clone(), ids).await?;
+    for world in &mut worlds {
+        world.random_number = rng.gen_range(1..10_001);
+    }
+
+    let data = serde_json::to_vec(&worlds)?;
+    update_worlds(db.clone(), worlds).await?;
+
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+fn main() {
+    dotenv().ok();
+
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    for _ in 1..thread_count {
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve());
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve());
+}
+
+async fn serve() {
+    let db_url: String = utils::get_env_var("TECHEMPOWER_MONGODB_URL");
+    let max_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MAX_POOL_SIZE");
+    let min_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MIN_POOL_SIZE");
+
+    let mut client_options = ClientOptions::parse(db_url).await.unwrap();
+    client_options.max_pool_size = Some(max_pool_size);
+    client_options.min_pool_size = Some(min_pool_size);
+    client_options.connect_timeout = Some(Duration::from_millis(200));
+
+    // the server will select the algorithm it supports from the list provided by the driver
+    client_options.compressors = Some(vec![
+        Compressor::Snappy,
+        Compressor::Zlib {
+            level: Default::default(),
+        },
+        Compressor::Zstd {
+            level: Default::default(),
+        },
+    ]);
+
+    let client = Client::with_options(client_options).unwrap();
+    let database = client.database("hello_world");
+
+    let router = Router::new()
+        .hoop(salvo::affix::inject(database))
+        .push(Router::with_path("db").get(world_row))
+        .push(Router::with_path("queries").get(queries))
+        .push(Router::with_path("updates").get(updates));
+
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
+}

+ 27 - 48
frameworks/Rust/salvo/src/main_pg.rs

@@ -1,11 +1,10 @@
 // #[global_allocator]
 // static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-#[global_allocator]
-static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::cmp;
 use std::fmt::Write;
-use std::sync::Arc;
 use std::thread::available_parallelism;
 
 use async_trait::async_trait;
@@ -14,14 +13,13 @@ use salvo::conn::tcp::TcpAcceptor;
 use salvo::http::header::{self, HeaderValue};
 use salvo::http::ResBody;
 use salvo::prelude::*;
+use dotenv::dotenv;
 use salvo::routing::FlowCtrl;
 
-mod models;
-mod pg_conn;
+mod db_pg;
+mod models_pg;
 mod utils;
-use pg_conn::PgConnection;
-
-const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+use db_pg::PgConnection;
 
 static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
 static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
@@ -32,22 +30,17 @@ struct WorldHandler {
 }
 impl WorldHandler {
     async fn new() -> Self {
+        let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
         Self {
-            conn: PgConnection::create(DB_URL)
+            conn: PgConnection::create(&db_url)
                 .await
-                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &db_url)),
         }
     }
 }
 #[async_trait]
 impl Handler for WorldHandler {
-    async fn handle(
-        &self,
-        _req: &mut Request,
-        _depot: &mut Depot,
-        res: &mut Response,
-        _ctrl: &mut FlowCtrl,
-    ) {
+    async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
         let world = self.conn.get_world().await.unwrap();
         let data = serde_json::to_vec(&world).unwrap();
         let headers = res.headers_mut();
@@ -61,22 +54,17 @@ struct WorldsHandler {
 }
 impl WorldsHandler {
     async fn new() -> Self {
+        let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
         Self {
-            conn: PgConnection::create(DB_URL)
+            conn: PgConnection::create(&db_url)
                 .await
-                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &db_url)),
         }
     }
 }
 #[async_trait]
 impl Handler for WorldsHandler {
-    async fn handle(
-        &self,
-        req: &mut Request,
-        _depot: &mut Depot,
-        res: &mut Response,
-        _ctrl: &mut FlowCtrl,
-    ) {
+    async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
         let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
         let worlds = self.conn.get_worlds(count).await.unwrap();
@@ -93,26 +81,20 @@ struct UpdatesHandler {
 }
 impl UpdatesHandler {
     async fn new() -> Self {
+        let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
         Self {
-            conn: PgConnection::create(DB_URL)
+            conn: PgConnection::create(&db_url)
                 .await
-                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &db_url)),
         }
     }
 }
 #[async_trait]
 impl Handler for UpdatesHandler {
-    async fn handle(
-        &self,
-        req: &mut Request,
-        _depot: &mut Depot,
-        res: &mut Response,
-        _ctrl: &mut FlowCtrl,
-    ) {
+    async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
         let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
-        res.headers_mut()
-            .insert(header::SERVER, SERVER_HEADER.clone());
+        res.headers_mut().insert(header::SERVER, SERVER_HEADER.clone());
         let worlds = self.conn.update(count).await.unwrap();
 
         let data = serde_json::to_vec(&worlds).unwrap();
@@ -127,22 +109,17 @@ struct FortunesHandler {
 }
 impl FortunesHandler {
     async fn new() -> Self {
+        let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
         Self {
-            conn: PgConnection::create(DB_URL)
+            conn: PgConnection::create(&db_url)
                 .await
-                .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)),
+                .unwrap_or_else(|_| panic!("Error connecting to {}", &db_url)),
         }
     }
 }
 #[async_trait]
 impl Handler for FortunesHandler {
-    async fn handle(
-        &self,
-        _req: &mut Request,
-        _depot: &mut Depot,
-        res: &mut Response,
-        _ctrl: &mut FlowCtrl,
-    ) {
+    async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
         let mut data = String::new();
         write!(&mut data, "{}", self.conn.tell_fortune().await.unwrap()).unwrap();
 
@@ -154,12 +131,14 @@ impl Handler for FortunesHandler {
 }
 
 fn main() {
-    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    dotenv().ok();
+    
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
         .unwrap();
-    for _ in 1..size {
+    for _ in 1..thread_count {
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()
                 .enable_all()

+ 197 - 0
frameworks/Rust/salvo/src/main_pg_pool.rs

@@ -0,0 +1,197 @@
+// #[global_allocator]
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use std::cmp;
+use std::fmt::Write;
+use std::sync::Arc;
+use std::thread::available_parallelism;
+
+use anyhow::Error;
+use bytes::Bytes;
+use deadpool_postgres::Pool;
+use futures_util::{stream::FuturesUnordered, TryStreamExt};
+use once_cell::sync::OnceCell;
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
+use salvo::prelude::*;
+use dotenv::dotenv;
+
+mod db_pg_pool;
+mod models_pg_pool;
+mod utils;
+
+use db_pg_pool::*;
+use models_pg_pool::*;
+
+static DB_POOL: OnceCell<Pool> = OnceCell::new();
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
+
+fn pool() -> &'static Pool {
+    unsafe { DB_POOL.get_unchecked() }
+}
+#[handler]
+async fn world_row(res: &mut Response) -> Result<(), Error> {
+    let mut rng = SmallRng::from_entropy();
+    let random_id = rng.gen_range(1..10_001);
+
+    let client = pool().get().await?;
+    let select = prepare_fetch_world_by_id_statement(&client).await;
+    let world = fetch_world_by_id(&client, random_id, &select).await?;
+
+    let data = serde_json::to_vec(&world).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+    let client = pool().get().await?;
+    let select = prepare_fetch_world_by_id_statement(&client).await;
+    let future_worlds = FuturesUnordered::new();
+
+    for _ in 0..count {
+        let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
+        future_worlds.push(fetch_world_by_id(&client, w_id, &select));
+    }
+    let worlds: Vec<World> = future_worlds.try_collect().await?;
+
+    let data = serde_json::to_vec(&worlds)?;
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn updates(req: &mut Request, res: &mut Response) -> Result<(), Error> {
+    let count = req.query::<u16>("q").unwrap_or(1);
+    let count = cmp::min(500, cmp::max(1, count));
+
+    let mut rng = SmallRng::from_entropy();
+    let client = pool().get().await?;
+    let select = prepare_fetch_world_by_id_statement(&client).await;
+
+    let future_worlds = FuturesUnordered::new();
+
+    for _ in 0..count {
+        let query_id = rng.gen_range(1..10_001);
+        future_worlds.push(fetch_world_by_id(&client, query_id, &select));
+    }
+
+    let worlds: Vec<World> = future_worlds.try_collect().await?;
+    let update = prepare_update_world_by_id_statement(&client).await;
+
+    let future_world_updates = FuturesUnordered::new();
+    for w in &worlds {
+        let random_id = rng.gen_range(1..10_001);
+        let w_id = w.id;
+        future_world_updates.push(update_world(&client, &update, random_id, w_id));
+    }
+    let _world_updates: Vec<u64> = future_world_updates.try_collect().await?;
+
+    let data = serde_json::to_vec(&worlds)?;
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn fortunes(res: &mut Response) -> Result<(), Error> {
+    let client = pool().get().await?;
+    let select = prepare_fetch_all_fortunes_statement(&client).await;
+    let mut items = fetch_all_fortunes(client, &select).await?;
+    items.push(Fortune {
+        id: 0,
+        message: "Additional fortune added at request time.".to_string(),
+    });
+    items.sort_by(|a, b| a.message.cmp(&b.message));
+
+    let mut data = String::new();
+    write!(&mut data, "{}", FortunesTemplate { items }).unwrap();
+
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+markup::define! {
+    FortunesTemplate(items: Vec<Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in items {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+fn main() {
+    dotenv().ok();
+    
+    let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
+    let max_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MAX_POOL_SIZE");
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+    rt.block_on(async {
+        DB_POOL.set(create_pool(db_url, max_pool_size).await).ok();
+    });
+
+    let router = Arc::new(
+        Router::new()
+            .push(Router::with_path("db").get(world_row))
+            .push(Router::with_path("fortunes").get(fortunes))
+            .push(Router::with_path("queries").get(queries))
+            .push(Router::with_path("updates").get(updates)),
+    );
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    for _ in 1..thread_count {
+        let router = router.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve(router));
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve(router));
+}
+
+async fn serve(router: Arc<Router>) {
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
+}

+ 135 - 0
frameworks/Rust/salvo/src/main_sqlx.rs

@@ -0,0 +1,135 @@
+// #[global_allocator]
+// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+// #[global_allocator]
+// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use std::fmt::Write;
+use std::sync::Arc;
+use std::thread::available_parallelism;
+
+use anyhow::Error;
+use bytes::Bytes;
+use once_cell::sync::OnceCell;
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+use salvo::conn::tcp::TcpAcceptor;
+use salvo::http::header::{self, HeaderValue};
+use salvo::http::ResBody;
+use salvo::prelude::*;
+use sqlx::PgPool;
+use dotenv::dotenv;
+
+mod db_sqlx;
+mod models_sqlx;
+mod utils;
+
+use db_sqlx::*;
+use models_sqlx::*;
+
+static DB_POOL: OnceCell<PgPool> = OnceCell::new();
+
+static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo");
+static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json");
+static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
+
+fn pool() -> &'static PgPool {
+    unsafe { DB_POOL.get_unchecked() }
+}
+#[handler]
+async fn world_row(res: &mut Response) -> Result<(), Error> {
+    let mut rng = SmallRng::from_entropy();
+    let random_id = rng.gen_range(1..10_001);
+
+    let conn = pool().acquire().await?;
+    let world = fetch_world(conn, random_id).await?;
+
+    let data = serde_json::to_vec(&world).unwrap();
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+#[handler]
+async fn fortunes(res: &mut Response) -> Result<(), Error> {
+    let conn = pool().acquire().await?;
+    let mut items = fetch_fortunes(conn).await?;
+    items.push(Fortune {
+        id: 0,
+        message: "Additional fortune added at request time.".to_string(),
+    });
+    items.sort_by(|a, b| a.message.cmp(&b.message));
+
+    let mut data = String::new();
+    write!(&mut data, "{}", FortunesTemplate { items }).unwrap();
+
+    let headers = res.headers_mut();
+    headers.insert(header::SERVER, SERVER_HEADER.clone());
+    headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone());
+    res.set_body(ResBody::Once(Bytes::from(data)));
+    Ok(())
+}
+
+markup::define! {
+    FortunesTemplate(items: Vec<Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in items {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+fn main() {
+    dotenv().ok();
+
+    let db_url: String = utils::get_env_var("TECHEMPOWER_POSTGRES_URL");
+    let max_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MAX_POOL_SIZE");
+    let min_pool_size: u32 = utils::get_env_var("TECHEMPOWER_MIN_POOL_SIZE");
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+    rt.block_on(async {
+        DB_POOL
+            .set(create_pool(db_url, max_pool_size, min_pool_size).await)
+            .ok();
+    });
+
+    let router = Arc::new(
+        Router::new()
+            .push(Router::with_path("db").get(world_row))
+            .push(Router::with_path("fortunes").get(fortunes)),
+    );
+    let thread_count = available_parallelism().map(|n| n.get()).unwrap_or(16);
+    for _ in 1..thread_count {
+        let router = router.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+            rt.block_on(serve(router));
+        });
+    }
+    println!("Started http server: 127.0.0.1:8080");
+    rt.block_on(serve(router));
+}
+
+async fn serve(router: Arc<Router>) {
+    let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap();
+    Server::new(acceptor).serve(router).await
+}

+ 1 - 6
frameworks/Rust/salvo/src/models.rs → frameworks/Rust/salvo/src/models_diesel.rs

@@ -1,10 +1,5 @@
 use diesel::Queryable;
-use serde::{Deserialize, Serialize};
-
-#[derive(Serialize, Deserialize)]
-pub struct Message {
-    pub message: &'static str,
-}
+use serde::Serialize;
 
 #[allow(non_snake_case)]
 #[derive(Serialize, Queryable, Clone, Debug)]

+ 14 - 0
frameworks/Rust/salvo/src/models_mongo.rs

@@ -0,0 +1,14 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct World {
+    pub id: i32,
+    #[serde(rename = "randomNumber")]
+    pub random_number: i32,
+}

+ 15 - 0
frameworks/Rust/salvo/src/models_pg.rs

@@ -0,0 +1,15 @@
+use serde::Serialize;
+
+#[allow(non_snake_case)]
+#[derive(Serialize, Clone, Debug)]
+pub struct World {
+    pub id: i32,
+    pub randomnumber: i32,
+}
+
+#[allow(non_snake_case)]
+#[derive(Serialize, Debug)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}

+ 19 - 0
frameworks/Rust/salvo/src/models_pg_pool.rs

@@ -0,0 +1,19 @@
+use serde::{Deserialize, Serialize};
+use tokio_pg_mapper_derive::PostgresMapper;
+
+#[allow(non_snake_case)]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, PostgresMapper)]
+#[pg_mapper(table = "Fortune")]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+#[allow(non_snake_case)]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, PostgresMapper)]
+#[pg_mapper(table = "World")]
+pub struct World {
+    pub id: i32,
+    #[serde(rename = "randomNumber")]
+    pub randomnumber: i32,
+}

+ 16 - 0
frameworks/Rust/salvo/src/models_sqlx.rs

@@ -0,0 +1,16 @@
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+pub struct World {
+    pub id: i32,
+    #[sqlx(rename = "randomnumber")]
+    #[serde(rename = "randomNumber")]
+    pub random_number: i32,
+}

+ 12 - 0
frameworks/Rust/salvo/src/utils.rs

@@ -1,8 +1,20 @@
 use std::io;
 use std::net::{Ipv4Addr, SocketAddr};
+use std::{env, fmt::Debug, str::FromStr};
 
 use tokio::net::{TcpListener, TcpSocket};
 
+#[allow(dead_code)]
+pub fn get_env_var<T: FromStr>(key: &str) -> T
+where
+    <T as FromStr>::Err: Debug,
+{
+    env::var(key)
+        .unwrap_or_else(|_| panic!("{key} environment variable was not set"))
+        .parse::<T>()
+        .unwrap_or_else(|_| panic!("could not parse {key}"))
+}
+
 #[allow(dead_code)]
 pub fn reuse_listener() -> io::Result<TcpListener> {
     let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080));