Browse Source

Add mongodb and deadpool-postgres actix benchmarks (#7310)

Patrick Freed 3 years ago
parent
commit
453267809f

File diff suppressed because it is too large
+ 671 - 22
frameworks/Rust/actix/Cargo.lock


+ 11 - 0
frameworks/Rust/actix/Cargo.toml

@@ -19,7 +19,16 @@ path = "src/main_http.rs"
 name = "tfb-server"
 name = "tfb-server"
 path = "src/main_server.rs"
 path = "src/main_server.rs"
 
 
+[[bin]]
+name = "tfb-web-mongodb"
+path = "src/main_mongodb.rs"
+
+[[bin]]
+name = "tfb-web-pg-deadpool"
+path = "src/main_pg_deadpool.rs"
+
 [dependencies]
 [dependencies]
+anyhow = "1"
 actix = "0.12"
 actix = "0.12"
 actix-web = { version = "4.0.0-rc.3", default-features = false, features = ["macros"] }
 actix-web = { version = "4.0.0-rc.3", default-features = false, features = ["macros"] }
 actix-http = { version = "3.0.0-rc.2", default-features = false }
 actix-http = { version = "3.0.0-rc.2", default-features = false }
@@ -44,6 +53,8 @@ simd-json-derive = "0.2"
 snmalloc-rs = "0.2.6"
 snmalloc-rs = "0.2.6"
 tokio = { version = "1", features = ["full"] }
 tokio = { version = "1", features = ["full"] }
 tokio-postgres = "0.7.5"
 tokio-postgres = "0.7.5"
+deadpool-postgres = "0.10.1"
+mongodb = "2.2.0"
 url = "2.1"
 url = "2.1"
 v_htmlescape = "0.14"
 v_htmlescape = "0.14"
 yarte = { version = "0.15", features = ["bytes-buf"] }
 yarte = { version = "0.15", features = ["bytes-buf"] }

+ 7 - 3
frameworks/Rust/actix/README.md

@@ -21,11 +21,15 @@ Actix web is a small, fast, pragmatic, open source rust web framework.
 * Multipart streams
 * Multipart streams
 * Middlewares (Logger, Session, DefaultHeaders, CORS)
 * Middlewares (Logger, Session, DefaultHeaders, CORS)
 
 
-## Database
+## Databases
 
 
-PostgreSQL.
+* PostgreSQL
+  * Raw driver access via [`tokio_postgres`](https://docs.rs/tokio-postgres/latest/tokio_postgres/) (actix-http test)
+  * ORM using [`diesel`](http://diesel.rs) (actix-web-diesel test)
+  * Raw driver access via [`tokio_postgres`](https://docs.rs/tokio-postgres/latest/tokio_postgres/) with connection pooling using [`deadpool_postgres`](https://docs.rs/deadpool-postgres/latest/deadpool_postgres/) (actix-web-pg-deadpool test)
 
 
-* ORM using [diesel](http://diesel.rs)
+* MongoDB
+  * Raw driver access and connection pooling via [`mongodb`](https://docs.rs/mongodb/latest/mongodb/) (actix-web-mongodb test)
 
 
 ## Test URLs
 ## Test URLs
 
 

+ 15 - 0
frameworks/Rust/actix/actix-web-mongodb.dockerfile

@@ -0,0 +1,15 @@
+FROM rust:1.57.0
+
+ENV ACTIX_TECHEMPOWER_MONGODB_URL=mongodb://tfb-database:27017
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /actix
+WORKDIR /actix
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin tfb-web-mongodb
+
+EXPOSE 8080
+
+CMD ./target/release/tfb-web-mongodb

+ 13 - 0
frameworks/Rust/actix/actix-web-pg-deadpool.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:1.57.0
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /actix
+WORKDIR /actix
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin tfb-web-pg-deadpool
+
+EXPOSE 8080
+
+CMD ./target/release/tfb-web-pg-deadpool

+ 40 - 0
frameworks/Rust/actix/benchmark_config.json

@@ -76,6 +76,46 @@
       "display_name": "actix-server",
       "display_name": "actix-server",
       "notes": "",
       "notes": "",
       "versus": ""
       "versus": ""
+    },
+    "web-mongodb": {
+      "db_url": "/db",
+      "fortune_url": "/fortunes",
+      "query_url": "/queries?q=",
+      "update_url": "/updates?q=",
+      "port": 8080,
+      "approach": "Realistic",
+      "classification": "Micro",
+      "database": "MongoDB",
+      "framework": "actix",
+      "language": "Rust",
+      "orm": "Raw",
+      "platform": "None",
+      "webserver": "actix-web",
+      "os": "Linux",
+      "database_os": "Linux",
+      "display_name": "Actix Web [MongoDB]",
+      "notes": "",
+      "versus": ""
+    },
+    "web-pg-deadpool": {
+      "db_url": "/db",
+      "fortune_url": "/fortunes",
+      "query_url": "/queries?q=",
+      "update_url": "/updates?q=",
+      "port": 8080,
+      "approach": "Realistic",
+      "classification": "Micro",
+      "database": "Postgres",
+      "framework": "actix",
+      "language": "Rust",
+      "orm": "Raw",
+      "platform": "None",
+      "webserver": "actix-web",
+      "os": "Linux",
+      "database_os": "Linux",
+      "display_name": "Actix Web [Postgres + deadpool]",
+      "notes": "",
+      "versus": ""
     }
     }
   }]
   }]
 }
 }

+ 219 - 0
frameworks/Rust/actix/src/main_mongodb.rs

@@ -0,0 +1,219 @@
+mod models_mongodb;
+mod utils;
+
+use models_mongodb::{Fortune, World};
+use utils::{Queries, Result, CONNECTION_POOL_SIZE};
+
+use actix_http::{
+    header::{HeaderValue, CONTENT_TYPE, SERVER},
+    KeepAlive, StatusCode,
+};
+use actix_web::{web, App, HttpResponse, HttpServer};
+use anyhow::bail;
+use futures::{stream::FuturesUnordered, TryStreamExt};
+use mongodb::bson::doc;
+use mongodb::{options::ClientOptions, Client};
+use rand::{prelude::SmallRng, Rng, SeedableRng};
+use tokio::runtime::Handle;
+use yarte::ywrite_html;
+
+use std::time::Duration;
+
+struct Data {
+    client: Client,
+    tokio_runtime: tokio::runtime::Handle,
+}
+
+async fn find_random_world(data: web::Data<Data>) -> Result<World> {
+    let runtime = data.tokio_runtime.clone();
+    runtime
+        .spawn(async move {
+            let mut rng = SmallRng::from_entropy();
+            let id = (rng.gen::<u32>() % 10_000 + 1) as i32;
+
+            let coll = data.client.database("hello_world").collection("world");
+            let world = coll
+                .find_one(doc! { "id": id as f32 }, None)
+                .await?
+                .expect("should find world");
+            Ok(world)
+        })
+        .await?
+}
+
+#[actix_web::get("/db")]
+async fn db(data: web::Data<Data>) -> Result<HttpResponse<Vec<u8>>> {
+    let world = find_random_world(data).await?;
+    let mut bytes = Vec::with_capacity(48);
+    serde_json::to_writer(&mut bytes, &world)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+async fn find_random_worlds(data: web::Data<Data>, num_of_worlds: usize) -> Result<Vec<World>> {
+    let mut futs = FuturesUnordered::new();
+    for _ in 0..num_of_worlds {
+        futs.push(find_random_world(data.clone()))
+    }
+
+    let mut worlds = Vec::with_capacity(num_of_worlds);
+    while let Some(world) = futs.try_next().await? {
+        worlds.push(world);
+    }
+
+    Ok(worlds)
+}
+
+#[actix_web::get("/queries")]
+async fn queries(
+    data: web::Data<Data>,
+    query: web::Query<Queries>,
+) -> Result<HttpResponse<Vec<u8>>> {
+    let n_queries = query.q;
+
+    let worlds = find_random_worlds(data, n_queries).await?;
+
+    let mut bytes = Vec::with_capacity(35 * n_queries);
+    serde_json::to_writer(&mut bytes, &worlds)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+#[actix_web::get("/updates")]
+async fn updates(
+    data: web::Data<Data>,
+    query: web::Query<Queries>,
+) -> Result<HttpResponse<Vec<u8>>> {
+    let tokio_runtime = data.tokio_runtime.clone();
+    let client = data.client.clone();
+
+    let mut worlds = find_random_worlds(data, query.q).await?;
+
+    let mut rng = SmallRng::from_entropy();
+    let mut updates = Vec::new();
+    for world in worlds.iter_mut() {
+        let new_random_number = (rng.gen::<u32>() % 10_000 + 1) as i32;
+        updates.push(doc! {
+            "q": { "id": world.id }, "u": { "$set": { "randomNumber": new_random_number }}
+        });
+        world.random_number = new_random_number;
+    }
+
+    tokio_runtime
+        .spawn(async move {
+            client
+                .database("hello_world")
+                .run_command(
+                    doc! {
+                        "update": "world",
+                        "updates": updates,
+                        "ordered": false,
+                    },
+                    None,
+                )
+                .await
+        })
+        .await??;
+
+    let mut bytes = Vec::with_capacity(35 * worlds.len());
+    serde_json::to_writer(&mut bytes, &worlds)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+#[actix_web::get("/fortunes")]
+async fn fortune(data: web::Data<Data>) -> Result<HttpResponse<Vec<u8>>> {
+    async fn fetch_fortunes(client: &Client) -> Result<Vec<Fortune>> {
+        let fortunes_cursor = client
+            .database("hello_world")
+            .collection::<Fortune>("fortune")
+            .find(None, None)
+            .await?;
+
+        let mut fortunes: Vec<Fortune> = fortunes_cursor.try_collect().await?;
+        fortunes.push(Fortune {
+            id: 0,
+            message: "Additional fortune added at request time.".to_string(),
+        });
+
+        fortunes.sort_by(|a, b| a.message.cmp(&b.message));
+
+        Ok(fortunes)
+    }
+
+    let d = data.clone();
+    let fortunes = data
+        .tokio_runtime
+        .spawn(async move { fetch_fortunes(&d.client).await })
+        .await??;
+
+    let mut body = Vec::with_capacity(2048);
+    ywrite_html!(body, "{{> fortune }}");
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, body);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut().insert(
+        CONTENT_TYPE,
+        HeaderValue::from_static("text/html; charset=utf-8"),
+    );
+
+    Ok(res)
+}
+
+fn main() {
+    actix_web::rt::System::with_tokio_rt(|| tokio::runtime::Runtime::new().unwrap())
+        .block_on(async_main())
+        .unwrap();
+}
+
+async fn async_main() -> Result<()> {
+    println!("Starting http server: 0.0.0.0:8080");
+
+    // use a separate, multithreaded tokio runtime for db queries for better performance
+    let handle = Handle::current();
+
+    let uri = std::env::var("ACTIX_TECHEMPOWER_MONGODB_URL")
+        .or_else(|_| bail!("missing ACTIX_TECHEMPOWER_MONGODB_URL env variable"))?;
+    let mut options = ClientOptions::parse(uri).await?;
+    options.max_pool_size = Some(CONNECTION_POOL_SIZE as u32);
+    let client = Client::with_options(options)?;
+
+    HttpServer::new(move || {
+        App::new()
+            .app_data(web::Data::new(Data {
+                client: client.clone(),
+                tokio_runtime: handle.clone(),
+            }))
+            .service(fortune)
+            .service(db)
+            .service(queries)
+            .service(updates)
+    })
+    .keep_alive(KeepAlive::Os)
+    .client_request_timeout(Duration::from_secs(0))
+    .backlog(1024)
+    .bind("0.0.0.0:8080")?
+    .run()
+    .await?;
+
+    Ok(())
+}

+ 210 - 0
frameworks/Rust/actix/src/main_pg_deadpool.rs

@@ -0,0 +1,210 @@
+mod models;
+mod utils;
+
+use std::fmt::Write;
+use std::time::Duration;
+
+use actix_http::KeepAlive;
+use actix_web::{
+    http::{
+        header::{HeaderValue, CONTENT_TYPE, SERVER},
+        StatusCode,
+    },
+    web, App, HttpResponse, HttpServer,
+};
+use deadpool_postgres::{Config, Pool, PoolConfig, Runtime};
+use futures::{stream::FuturesUnordered, TryStreamExt};
+use models::{Fortune, World};
+use rand::{prelude::SmallRng, Rng, SeedableRng};
+use tokio_postgres::{types::ToSql, NoTls};
+use utils::{Queries, Result, CONNECTION_POOL_SIZE};
+use yarte::ywrite_html;
+
+async fn find_random_world(pool: &Pool) -> Result<World> {
+    let conn = pool.get().await?;
+    let world = conn
+        .prepare("SELECT * FROM world WHERE id=$1")
+        .await
+        .unwrap();
+
+    let mut rng = SmallRng::from_entropy();
+    let id = (rng.gen::<u32>() % 10_000 + 1) as i32;
+
+    let row = conn.query_one(&world, &[&id]).await?;
+
+    Ok(World {
+        id: row.get(0),
+        randomnumber: row.get(1),
+    })
+}
+
+async fn find_random_worlds(pool: &Pool, num_of_worlds: usize) -> Result<Vec<World>> {
+    let mut futs = FuturesUnordered::new();
+    for _ in 0..num_of_worlds {
+        futs.push(find_random_world(pool));
+    }
+
+    let mut worlds = Vec::with_capacity(num_of_worlds);
+    while let Some(world) = futs.try_next().await? {
+        worlds.push(world);
+    }
+
+    Ok(worlds)
+}
+
+#[actix_web::get("/db")]
+async fn db(data: web::Data<Pool>) -> Result<HttpResponse<Vec<u8>>> {
+    let world = find_random_world(&data).await?;
+    let mut bytes = Vec::with_capacity(48);
+    serde_json::to_writer(&mut bytes, &world)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+#[actix_web::get("/queries")]
+async fn queries(
+    data: web::Data<Pool>,
+    query: web::Query<Queries>,
+) -> Result<HttpResponse<Vec<u8>>> {
+    let n_queries = query.q;
+
+    let worlds = find_random_worlds(&data, n_queries).await?;
+
+    let mut bytes = Vec::with_capacity(35 * n_queries);
+    serde_json::to_writer(&mut bytes, &worlds)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+#[actix_web::get("/updates")]
+async fn updates(
+    data: web::Data<Pool>,
+    query: web::Query<Queries>,
+) -> Result<HttpResponse<Vec<u8>>> {
+    let mut worlds = find_random_worlds(&data, query.q).await?;
+
+    let mut rng = SmallRng::from_entropy();
+
+    let mut updates = "UPDATE world SET randomnumber = CASE id ".to_string();
+    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(query.q as usize * 3);
+
+    let mut n_params = 1;
+    for world in worlds.iter_mut() {
+        let new_random_number = (rng.gen::<u32>() % 10_000 + 1) as i32;
+        write!(&mut updates, "when ${} then ${} ", n_params, n_params + 1).unwrap();
+        world.randomnumber = new_random_number;
+        n_params += 2;
+    }
+
+    // need separate loop to borrow immutably
+    for world in worlds.iter() {
+        params.push(&world.id);
+        params.push(&world.randomnumber);
+    }
+
+    updates.push_str("ELSE randomnumber END WHERE id IN (");
+    for world in worlds.iter() {
+        write!(&mut updates, "${},", n_params).unwrap();
+        params.push(&world.id);
+        n_params += 1;
+    }
+
+    updates.pop(); // drop trailing comma
+    updates.push(')');
+
+    let conn = data.get().await?;
+    let stmt = conn.prepare(&updates).await?;
+    conn.query(&stmt, &params).await?;
+
+    let mut bytes = Vec::with_capacity(35 * worlds.len());
+    serde_json::to_writer(&mut bytes, &worlds)?;
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, bytes);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut()
+        .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+
+    Ok(res)
+}
+
+#[actix_web::get("/fortunes")]
+async fn fortune(data: web::Data<Pool>) -> Result<HttpResponse<Vec<u8>>> {
+    let conn = data.get().await?;
+    let stmt = conn.prepare("SELECT * FROM Fortune").await?;
+    let params: &[&'static str] = &[];
+    let s = conn.query_raw(&stmt, params).await?;
+
+    let mut stream = Box::pin(s);
+    let mut fortunes = Vec::new();
+
+    while let Some(row) = stream.try_next().await? {
+        fortunes.push(Fortune {
+            id: row.get(0),
+            message: row.get(1),
+        });
+    }
+
+    fortunes.push(Fortune {
+        id: 0,
+        message: "Additional fortune added at request time.".to_string(),
+    });
+
+    fortunes.sort_by(|a, b| a.message.cmp(&b.message));
+
+    let mut body = Vec::with_capacity(2048);
+    ywrite_html!(body, "{{> fortune }}");
+
+    let mut res = HttpResponse::with_body(StatusCode::OK, body);
+    res.headers_mut()
+        .insert(SERVER, HeaderValue::from_static("Actix"));
+    res.headers_mut().insert(
+        CONTENT_TYPE,
+        HeaderValue::from_static("text/html; charset=utf-8"),
+    );
+
+    Ok(res)
+}
+
+#[actix_web::main]
+async fn main() -> Result<()> {
+    println!("Starting http server: 0.0.0.0:8080");
+
+    let mut cfg = Config::new();
+    cfg.host = Some("tfb-database".to_string());
+    cfg.dbname = Some("hello_world".to_string());
+    cfg.user = Some("benchmarkdbuser".to_string());
+    cfg.password = Some("benchmarkdbpass".to_string());
+    let pc = PoolConfig::new(CONNECTION_POOL_SIZE);
+    cfg.pool = pc.into();
+    let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
+
+    HttpServer::new(move || {
+        App::new()
+            .app_data(web::Data::new(pool.clone()))
+            .service(fortune)
+            .service(db)
+            .service(queries)
+            .service(updates)
+    })
+    .keep_alive(KeepAlive::Os)
+    .client_request_timeout(Duration::from_secs(0))
+    .backlog(1024)
+    .bind("0.0.0.0:8080")?
+    .run()
+    .await?;
+
+    Ok(())
+}

+ 67 - 0
frameworks/Rust/actix/src/models_mongodb.rs

@@ -0,0 +1,67 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct World {
+    pub id: i32,
+    pub random_number: i32,
+}
+
+// The ids are stored in MongoDB as floating point numbers, so need a custom deserialization implementation
+// to handle converting them.
+impl<'de> Deserialize<'de> for World {
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        #[derive(Deserialize)]
+        #[serde(rename_all = "camelCase")]
+        struct FloatWorld {
+            id: f32,
+            random_number: f32,
+        }
+
+        let float = FloatWorld::deserialize(deserializer)?;
+        Ok(World {
+            id: float.id as i32,
+            random_number: float.random_number as i32,
+        })
+    }
+}
+
+#[allow(non_snake_case)]
+#[derive(Serialize, Debug)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+// The ids are stored in MongoDB as floating point numbers, so need a custom deserialization implementation
+// to handle converting them.
+impl<'de> Deserialize<'de> for Fortune {
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        #[derive(Deserialize)]
+        struct FloatFortune {
+            id: f32,
+            message: String,
+        }
+
+        let float = FloatFortune::deserialize(deserializer)?;
+        Ok(Fortune {
+            id: float.id as i32,
+            message: float.message,
+        })
+    }
+}
+
+impl Default for Fortune {
+    fn default() -> Self {
+        Fortune {
+            id: -1,
+            message: "".to_string(),
+        }
+    }
+}

+ 53 - 1
frameworks/Rust/actix/src/utils.rs

@@ -1,6 +1,6 @@
 #![allow(dead_code, unused_braces)]
 #![allow(dead_code, unused_braces)]
 
 
-use std::{borrow::Cow, cmp, io};
+use std::{borrow::Cow, cmp, fmt::Display, io};
 
 
 use bytes::BufMut;
 use bytes::BufMut;
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
@@ -44,3 +44,55 @@ pub fn get_query_param(query: &str) -> u16 {
     };
     };
     cmp::min(500, cmp::max(1, q))
     cmp::min(500, cmp::max(1, q))
 }
 }
+
+pub const CONNECTION_POOL_SIZE: usize = 40;
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug)]
+pub struct Error {
+    err: anyhow::Error,
+}
+
+impl Display for Error {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.err.fmt(f)
+    }
+}
+
+impl actix_web::error::ResponseError for Error {}
+
+impl<T> From<T> for Error
+where
+    T: Into<anyhow::Error>,
+{
+    fn from(e: T) -> Self {
+        Error { err: e.into() }
+    }
+}
+
+pub struct Queries {
+    pub q: usize,
+}
+
+impl<'de> Deserialize<'de> for Queries {
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        #[derive(Debug, Deserialize)]
+        struct Q {
+            q: Option<String>,
+        }
+
+        let q = Q::deserialize(deserializer)?;
+        let n = match q.q {
+            Some(s) => {
+                let i: i32 = s.parse().unwrap_or(1);
+                std::cmp::max(1, std::cmp::min(500, i)) as usize
+            }
+            None => 1,
+        };
+        Ok(Queries { q: n })
+    }
+}

Some files were not shown because too many files changed in this diff