Browse Source

feat: upgrade axum & dependencies, refactor, and add code and build optimisations (#9237)

Andrew James 11 months ago
parent
commit
44a869e49f
38 changed files with 1286 additions and 1095 deletions
  1. 372 228
      frameworks/Rust/axum/Cargo.lock
  2. 55 20
      frameworks/Rust/axum/Cargo.toml
  3. 37 33
      frameworks/Rust/axum/README.md
  4. 0 25
      frameworks/Rust/axum/axum-mongo-raw.dockerfile
  5. 0 25
      frameworks/Rust/axum/axum-mongo.dockerfile
  6. 0 24
      frameworks/Rust/axum/axum-pg-pool.dockerfile
  7. 0 23
      frameworks/Rust/axum/axum-pg.dockerfile
  8. 0 26
      frameworks/Rust/axum/axum-sqlx.dockerfile
  9. 15 12
      frameworks/Rust/axum/axum.dockerfile
  10. 13 0
      frameworks/Rust/axum/benchmark_config.json
  11. 2 1
      frameworks/Rust/axum/config.toml
  12. 51 0
      frameworks/Rust/axum/src/common/mod.rs
  13. 5 0
      frameworks/Rust/axum/src/common/models.rs
  14. 152 0
      frameworks/Rust/axum/src/common/simd_json.rs
  15. 3 21
      frameworks/Rust/axum/src/common/utils.rs
  16. 0 199
      frameworks/Rust/axum/src/database_pg.rs
  17. 0 86
      frameworks/Rust/axum/src/database_sqlx.rs
  18. 16 26
      frameworks/Rust/axum/src/main.rs
  19. 30 46
      frameworks/Rust/axum/src/main_mongo.rs
  20. 30 50
      frameworks/Rust/axum/src/main_mongo_raw.rs
  21. 36 54
      frameworks/Rust/axum/src/main_pg.rs
  22. 57 79
      frameworks/Rust/axum/src/main_pg_pool.rs
  23. 100 43
      frameworks/Rust/axum/src/main_sqlx.rs
  24. 0 6
      frameworks/Rust/axum/src/models_common.rs
  25. 2 1
      frameworks/Rust/axum/src/mongo/database.rs
  26. 1 0
      frameworks/Rust/axum/src/mongo/mod.rs
  27. 2 1
      frameworks/Rust/axum/src/mongo_raw/database.rs
  28. 1 0
      frameworks/Rust/axum/src/mongo_raw/mod.rs
  29. 155 0
      frameworks/Rust/axum/src/pg/database.rs
  30. 2 0
      frameworks/Rust/axum/src/pg/mod.rs
  31. 0 0
      frameworks/Rust/axum/src/pg/models.rs
  32. 15 41
      frameworks/Rust/axum/src/pg_pool/database.rs
  33. 2 0
      frameworks/Rust/axum/src/pg_pool/mod.rs
  34. 0 0
      frameworks/Rust/axum/src/pg_pool/models.rs
  35. 94 25
      frameworks/Rust/axum/src/server.rs
  36. 36 0
      frameworks/Rust/axum/src/sqlx/database.rs
  37. 2 0
      frameworks/Rust/axum/src/sqlx/mod.rs
  38. 0 0
      frameworks/Rust/axum/src/sqlx/models.rs

File diff suppressed because it is too large
+ 372 - 228
frameworks/Rust/axum/Cargo.lock


+ 55 - 20
frameworks/Rust/axum/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
-name = "axum"
-version = "0.2.0"
+name = "axum-techempower"
+version = "0.2.1"
 authors = ["Dragos Varovici <[email protected]>"]
 edition = "2021"
 
@@ -28,28 +28,63 @@ path = "src/main_mongo_raw.rs"
 name = "axum-pg"
 path = "src/main_pg.rs"
 
+[features]
+default = []
+simd-json = [
+    "dep:simd-json",
+    "dep:axum-core",
+    "dep:mime",
+    "dep:bytes",
+    "dep:serde_path_to_error",
+]
+
 [dependencies]
-axum = { version = "0.6.16", default-features = false, features = ["json", "query", "http1", "tokio"] }
-deadpool = { version = "0.10.0", features = ["rt_tokio_1", "serde", "async-trait", "managed" ] }
-deadpool-postgres = "0.12.1"
+axum = { version = "0.7.5", default-features = false, features = [
+    "json",
+    "query",
+    "http1",
+    "tokio",
+] }
+deadpool = { version = "0.12.1", features = ["rt_tokio_1", "serde", "managed"] }
+deadpool-postgres = { version = "0.14.0", features = ["rt_tokio_1", "serde"] }
 dotenv = "0.15.0"
-futures = "0.3.25"
-futures-util = "0.3.25"
-hyper = { version = "0.14.23", features = ["http1", "server"] }
-mongodb = { version = "2.3.1", features = ["zstd-compression", "snappy-compression", "zlib-compression"] }
-num_cpus = "1.14.0"
+futures = "0.3.30"
+futures-util = "0.3.30"
+mongodb = { version = "2.8.0", features = [
+    "zstd-compression",
+    "snappy-compression",
+    "zlib-compression",
+] }
+num_cpus = "1.16.0"
 rand = { version = "0.8.5", features = ["small_rng"] }
-serde = { version = "1.0.149", features = ["derive"] }
-serde_json = "1.0.89"
-sqlx = { version = "0.7.3", features = ["postgres", "macros", "runtime-tokio-native-tls"] }
-tokio = { version = "1.24.2", features = ["full"] }
-tokio-pg-mapper = "0.2.0"
-tokio-pg-mapper-derive = "0.2.0"
-tokio-postgres = "0.7.7"
-tower = { version = "0.4.13", features = ["util"] }
-tower-http = { version = "0.4.0", features = ["set-header"] }
+serde = { version = "1.0.196", features = ["derive"] }
+serde_json = "1.0.127"
+sqlx = { version = "0.7.3", features = [
+    "postgres",
+    "macros",
+    "runtime-tokio",
+    "tls-rustls",
+] }
+tokio = { version = "1.39.3", features = ["full"] }
+tokio-pg-mapper = { version = "0.2.0" }
+tokio-pg-mapper-derive = { version = "0.2.0" }
+tokio-postgres = { version = "0.7.11" }
+tower = { version = "0.5.0", features = ["util"] }
+tower-http = { version = "0.5.2", features = ["set-header"] }
 yarte = "0.15.7"
+simd-json = { version = "0.13.8", optional = true }
+axum-core = { version = "0.4.3", optional = true }
+mime = { version = "0.3.17", optional = true }
+bytes = { version = "1.5.0", optional = true }
+serde_path_to_error = { version = "0.1.15", optional = true }
+moka = { version = "0.12.8", features = ["future"] }
+socket2 = "0.5.7"
+hyper = { version = "1.4", features = ["server", "http1"] }
+hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
+
 
 [profile.release]
-lto = true
+lto = "fat"
 codegen-units = 1
+strip = true
+opt-level = 3

+ 37 - 33
frameworks/Rust/axum/README.md

@@ -1,42 +1,46 @@
-
-# [Axum](https://github.com/tokio-rs/axum) - Ergonomic and modular web framework built with Tokio, Tower, and Hyper
+# [Axum](https://github.com/tokio-rs/axum)
 
 ## Description
 
-Axum is a web application framework that focuses on ergonomics and modularity.
-
-* [User Guide](https://docs.rs/axum/0.3/axum/)
-* [API Documentation](https://docs.rs/axum/0.3/axum/)
-* Cargo package: [axum](https://crates.io/crates/axum)
+Axum is a web application framework that focuses on ergonomics and modularity,
+built with Tokio, Tower, and Hyper.
 
-## Database
+- [User Guide](https://docs.rs/axum/latest/axum/)
+- [API Documentation](https://docs.rs/axum/latest/axum/)
+- [Cargo Package (`axum`)](https://crates.io/crates/axum)
 
-PostgreSQL
+## Variants
 
-* Raw using [sqlx](https://github.com/launchbadge/sqlx)
+- PostgreSQL using `SQLx`, `tokio_postgres`, and `deadpool`.
+- MongoDB with `mongodb`.
 
 ## Test URLs
 
-### Test 1: JSON Encoding
-
-    http://localhost:8000/json
-
-### Test 2: Single Row Query
-
-    http://localhost:8000/db
-
-### Test 3: Multi Row Query
-
-    http://localhost:8000/queries?q=20
-
-### Test 4: Fortunes (Template rendering)
-
-    http://localhost:8000/fortunes
-
-### Test 5: Update Query
-
-    http://localhost:8000/updates?q=20
-
-### Test 6: Plaintext
-
-    http://localhost:8000/plaintext
+- Plaintext: http://localhost:8000/plaintext
+- JSON Encoding: http://localhost:8000/json
+- Single Row Query: http://localhost:8000/db
+- Multi Row Query: http://localhost:8000/queries?q=20
+- Fortunes: http://localhost:8000/fortunes
+- Update Query: http://localhost:8000/updates?q=20
+- Cached Query: http://localhost:8000/cached-queries?q=20
+
+## Notable Points (both performance and build)
+
+- Use of `async`.
+- Use of most recent versions of Rust, `axum` and dependencies.
+- (Disabled by default) Compile-time swap-in of `simd-json` instead of `serde_json` for faster JSON serialization.
+- Release binaries are stripped and compiled with CPU native.
+- Sockets configured with TCP_NODELAY and to support an increased number of pending connections.
+- Server configured to serve HTTP/1 only, with no need for websockets.
+- Separation of build and deployment containers using multi-stage builds.
+- Deployment into Google's minimal `distroless-cc` container.
+- Use of pipelined database queries (where supported).
+- Streaming database queries (where supported).
+- Use of PostgreSQL prepared statements cache (where supported).
+- Use of PostgreSQL arrays to execute multi-row database updates with a single `UPDATE` query.
+  - This is permitted by the [test requirements](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview#database-updates), step (ix).
+- In version 0.7.6 (as yet unreleased), a native API to set TCP_NODELAY will be included.
+  - https://github.com/tokio-rs/axum/pull/2653/
+  - https://github.com/tokio-rs/axum/issues/2521
+- More performance improvements are to be expected in version 0.8:
+  - https://github.com/tokio-rs/axum/issues/1827

+ 0 - 25
frameworks/Rust/axum/axum-mongo-raw.dockerfile

@@ -1,25 +0,0 @@
-FROM rust:1.75-slim-buster
-
-ENV AXUM_TECHEMPOWER_MONGODB_URL=mongodb://tfb-database:27017
-ENV AXUM_TECHEMPOWER_MAX_POOL_SIZE=28
-ENV AXUM_TECHEMPOWER_MIN_POOL_SIZE=14
-
-RUN apt-get update && apt-get install -y --no-install-recommends \
-    pkg-config libssl-dev \
-    && rm -rf /var/lib/apt/lists/*
-
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
-ENV RUSTFLAGS "-C target-cpu=native"
-RUN cargo build --release
-RUN cp ./target/release/axum-mongo-raw ./target/release/axum-techempower
-
-EXPOSE 8000
-
-CMD ["./run.sh"]

+ 0 - 25
frameworks/Rust/axum/axum-mongo.dockerfile

@@ -1,25 +0,0 @@
-FROM rust:1.75-slim-buster
-
-ENV AXUM_TECHEMPOWER_MONGODB_URL=mongodb://tfb-database:27017
-ENV AXUM_TECHEMPOWER_MAX_POOL_SIZE=28
-ENV AXUM_TECHEMPOWER_MIN_POOL_SIZE=14
-
-RUN apt-get update && apt-get install -y --no-install-recommends \
-    pkg-config libssl-dev \
-    && rm -rf /var/lib/apt/lists/*
-
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
-ENV RUSTFLAGS "-C target-cpu=native"
-RUN cargo build --release
-RUN cp ./target/release/axum-mongo ./target/release/axum-techempower
-
-EXPOSE 8000
-
-CMD ["./run.sh"]

+ 0 - 24
frameworks/Rust/axum/axum-pg-pool.dockerfile

@@ -1,24 +0,0 @@
-FROM rust:1.75-slim-buster
-
-ENV AXUM_TECHEMPOWER_DATABASE_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
-ENV AXUM_TECHEMPOWER_MAX_POOL_SIZE=28
-
-RUN apt-get update && apt-get install -y --no-install-recommends \
-    libpq-dev pkg-config libssl-dev \
-    && rm -rf /var/lib/apt/lists/*
-
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
-ENV RUSTFLAGS "-C target-cpu=native"
-RUN cargo build --release
-RUN cp ./target/release/axum-pg-pool ./target/release/axum-techempower
-
-EXPOSE 8000
-
-CMD ["./run.sh"]

+ 0 - 23
frameworks/Rust/axum/axum-pg.dockerfile

@@ -1,23 +0,0 @@
-FROM rust:1.75-slim-buster
-
-ENV AXUM_TECHEMPOWER_DATABASE_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
-
-RUN apt-get update && apt-get install -y --no-install-recommends \
-    libpq-dev pkg-config libssl-dev \
-    && rm -rf /var/lib/apt/lists/*
-
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
-ENV RUSTFLAGS "-C target-cpu=native"
-RUN cargo build --release
-RUN cp ./target/release/axum-pg ./target/release/axum-techempower
-
-EXPOSE 8000
-
-CMD ["./run.sh"]

+ 0 - 26
frameworks/Rust/axum/axum-sqlx.dockerfile

@@ -1,26 +0,0 @@
-FROM rust:1.75-slim-buster
-
-ENV AXUM_TECHEMPOWER_DATABASE_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
-ENV AXUM_TECHEMPOWER_MAX_POOL_SIZE=56
-ENV AXUM_TECHEMPOWER_MIN_POOL_SIZE=56
-
-RUN apt-get update && apt-get install -y --no-install-recommends \
-    libpq-dev pkg-config libssl-dev \
-    && rm -rf /var/lib/apt/lists/*
-
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
-ENV RUSTFLAGS "-C target-cpu=native"
-RUN cargo build --release
-RUN cp ./target/release/axum-sqlx ./target/release/axum-techempower
-
-
-EXPOSE 8000
-
-CMD ["./run.sh"]

+ 15 - 12
frameworks/Rust/axum/axum.dockerfile

@@ -1,21 +1,24 @@
-FROM rust:1.75-slim-buster
+FROM docker.io/rust:1.80-slim-bookworm AS builder
 
 RUN apt-get update && apt-get install -y --no-install-recommends \
     pkg-config libssl-dev \
     && rm -rf /var/lib/apt/lists/*
 
-WORKDIR /axum
-COPY ./src ./src
-COPY ./templates ./templates
-COPY ./Cargo.toml ./Cargo.toml
-COPY ./Cargo.lock ./Cargo.lock
-COPY ./run.sh ./run.sh
-RUN chmod +x ./run.sh
-
+WORKDIR /build
+COPY ./Cargo.toml ./Cargo.lock /build/
+RUN cargo fetch
+COPY ./templates/ /build/templates
+COPY ./src/ /build/src
 ENV RUSTFLAGS "-C target-cpu=native"
 RUN cargo build --release
-RUN cp ./target/release/axum ./target/release/axum-techempower
 
+FROM gcr.io/distroless/cc-debian12
+ENV POSTGRES_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+ENV POSTGRES_MIN_POOL_SIZE=56
+ENV POSTGRES_MAX_POOL_SIZE=56
+ENV MONGODB_URL=mongodb://tfb-database:27017
+ENV MONGODB_MIN_POOL_SIZE=28
+ENV MONGODB_MAX_POOL_SIZE=14
+COPY --from=builder /build/target/release/axum* /app/
 EXPOSE 8000
-
-CMD ["./run.sh"]
+CMD ["/app/axum"]

+ 13 - 0
frameworks/Rust/axum/benchmark_config.json

@@ -3,6 +3,8 @@
   "tests": [
     {
       "default": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum",
         "json_url": "/json",
         "plaintext_url": "/plaintext",
         "port": 8000,
@@ -22,8 +24,11 @@
         "versus": "None"
       },
       "sqlx": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum-sqlx",
         "db_url": "/db",
         "fortune_url": "/fortunes",
+        "cached_query_url": "/cached-queries?queries=",
         "port": 8000,
         "approach": "Realistic",
         "classification": "Fullstack",
@@ -41,6 +46,8 @@
         "versus": "None"
       },
       "pg": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum-pg",
         "db_url": "/db",
         "fortune_url": "/fortunes",
         "query_url": "/queries?queries=",
@@ -62,6 +69,8 @@
         "versus": "None"
       },
       "pg-pool": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum-pg-pool",
         "db_url": "/db",
         "query_url": "/queries?queries=",
         "update_url": "/updates?queries=",
@@ -83,6 +92,8 @@
         "versus": "None"
       },
       "mongo": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum-mongo",
         "db_url": "/db",
         "query_url": "/queries?queries=",
         "fortune_url": "/fortunes",
@@ -104,6 +115,8 @@
         "versus": "None"
       },
       "mongo-raw": {
+        "dockerfile": "axum.dockerfile",
+        "docker_cmd": "/app/axum-mongo-raw",
         "db_url": "/db",
         "query_url": "/queries?queries=",
         "update_url": "/updates?queries=",

+ 2 - 1
frameworks/Rust/axum/config.toml

@@ -8,6 +8,7 @@ urls.db = "/db"
 urls.query = "/queries?q="
 urls.update = "/updates?q="
 urls.fortune = "/fortunes"
+urls.cached_query = "/cached-queries?q="
 approach = "Realistic"
 classification = "Fullstack"
 database = "Postgres"
@@ -16,4 +17,4 @@ os = "Linux"
 orm = "Raw"
 platform = "Rust"
 webserver = "Hyper"
-versus = "None"
+versus = "None"

+ 51 - 0
frameworks/Rust/axum/src/common/mod.rs

@@ -0,0 +1,51 @@
+use std::{env, str::FromStr};
+
+use core::fmt::Debug;
+use rand::{distributions::Uniform, rngs::SmallRng, Rng};
+pub mod models;
+pub mod utils;
+
+#[cfg(feature = "simd-json")]
+pub mod simd_json;
+
+#[allow(dead_code)]
+pub const SELECT_ALL_FORTUNES: &str = "SELECT * FROM fortune";
+#[allow(dead_code)]
+pub const SELECT_WORLD_BY_ID: &str =
+    "SELECT id, randomnumber FROM world WHERE id = $1 LIMIT 1";
+#[allow(dead_code)]
+pub const SELECT_ALL_CACHED_WORLDS: &str =
+    "SELECT id, randomnumber FROM world ORDER BY id";
+#[allow(dead_code)]
+pub const UPDATE_WORLDS: &str = "WITH vals AS (SELECT * FROM UNNEST($1::int[], $2::int[]) AS v(id, rnum))
+    UPDATE world SET randomnumber = new.rnum FROM
+  (SELECT w.id, v.rnum FROM world w INNER JOIN vals v ON v.id = w.id ORDER BY w.id FOR UPDATE) AS new
+  WHERE world.id = new.id";
+
+/// Return the value of an environment variable.
+#[allow(dead_code)]
+pub fn get_env<T: FromStr>(key: &str) -> T
+where
+    <T as FromStr>::Err: Debug,
+{
+    env::var(key)
+        .unwrap_or_else(|_| panic!("{key} environment variable was not set"))
+        .parse::<T>()
+        .unwrap_or_else(|_| panic!("could not parse {key}"))
+}
+
+/// Generate a single integer in the range 1 to 10,000 (inclusive)
+#[allow(dead_code)]
+#[inline]
+pub fn random_id(rng: &mut SmallRng) -> i32 {
+    rng.gen_range(1..10_001)
+}
+
+/// Generate vector of integers in the range 1 to 10,000 (inclusive)
+#[allow(dead_code)]
+#[inline]
+pub fn random_ids(rng: &mut SmallRng, count: usize) -> Vec<i32> {
+    rng.sample_iter(Uniform::new(1, 10_001))
+        .take(count)
+        .collect()
+}

+ 5 - 0
frameworks/Rust/axum/src/models_mongo.rs → frameworks/Rust/axum/src/common/models.rs

@@ -1,5 +1,10 @@
 use serde::{Deserialize, Serialize};
 
+#[derive(Serialize)]
+pub struct Message {
+    pub message: &'static str,
+}
+
 #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
 pub struct Fortune {
     pub id: i32,

+ 152 - 0
frameworks/Rust/axum/src/common/simd_json.rs

@@ -0,0 +1,152 @@
+use axum::extract::rejection::JsonRejection::MissingJsonContentType;
+use axum::extract::Request;
+use axum::extract::{rejection::*, FromRequest};
+use axum::{async_trait, http};
+use axum_core::response::{IntoResponse, Response};
+use bytes::{BufMut, Bytes, BytesMut};
+use http::{
+    header::{self, HeaderMap, HeaderValue},
+    StatusCode,
+};
+use serde::{de::DeserializeOwned, Serialize};
+use simd_json;
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct Json<T>(pub T);
+
+pub enum SimdJsonRejection {
+    Json(JsonRejection),
+    Bytes(BytesRejection),
+    Simd(String),
+}
+
+impl IntoResponse for SimdJsonRejection {
+    fn into_response(self) -> Response {
+        todo!()
+    }
+}
+
+impl From<JsonRejection> for SimdJsonRejection {
+    fn from(err: JsonRejection) -> Self {
+        SimdJsonRejection::Json(err)
+    }
+}
+
+impl From<BytesRejection> for SimdJsonRejection {
+    fn from(err: BytesRejection) -> Self {
+        SimdJsonRejection::Bytes(err)
+    }
+}
+
+impl From<simd_json::Error> for SimdJsonRejection {
+    fn from(err: simd_json::Error) -> Self {
+        SimdJsonRejection::Simd(err.to_string())
+    }
+}
+
+#[async_trait]
+impl<T, S> FromRequest<S> for Json<T>
+where
+    T: DeserializeOwned,
+    S: Send + Sync,
+{
+    type Rejection = SimdJsonRejection;
+
+    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
+        if json_content_type(req.headers()) {
+            let bytes = Bytes::from_request(req, state).await?;
+            Self::from_bytes(&bytes)
+        } else {
+            Err(SimdJsonRejection::Json(MissingJsonContentType(
+                axum::extract::rejection::MissingJsonContentType::default(),
+            )))
+        }
+    }
+}
+
+fn json_content_type(headers: &HeaderMap) -> bool {
+    let content_type = if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
+        content_type
+    } else {
+        return false;
+    };
+
+    let content_type = if let Ok(content_type) = content_type.to_str() {
+        content_type
+    } else {
+        return false;
+    };
+
+    let mime = if let Ok(mime) = content_type.parse::<mime::Mime>() {
+        mime
+    } else {
+        return false;
+    };
+
+    let is_json_content_type = mime.type_() == "application"
+        && (mime.subtype() == "json"
+            || mime.suffix().map_or(false, |name| name == "json"));
+
+    is_json_content_type
+}
+
+axum_core::__impl_deref!(Json);
+
+impl<T> From<T> for Json<T> {
+    fn from(inner: T) -> Self {
+        Self(inner)
+    }
+}
+
+impl<T> Json<T>
+where
+    T: DeserializeOwned,
+{
+    /// Construct a `Json<T>` from a byte slice. Most users should prefer to use the `FromRequest` impl
+    /// but special cases may require first extracting a `Request` into `Bytes` then optionally
+    /// constructing a `Json<T>`.
+    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SimdJsonRejection> {
+        let body = &mut bytes.to_owned();
+        let deserializer = simd_json::from_slice::<T>(body);
+
+        let value = match deserializer {
+            Ok(v) => v,
+            Err(err) => {
+                let rejection = { SimdJsonRejection::from(err) };
+                return Err(rejection);
+            }
+        };
+
+        Ok(Json(value))
+    }
+}
+
+impl<T> IntoResponse for Json<T>
+where
+    T: Serialize,
+{
+    fn into_response(self) -> Response {
+        // Use a small initial capacity of 128 bytes like serde_json::to_vec
+        // https://docs.rs/serde_json/1.0.82/src/serde_json/ser.rs.html#2189
+        let mut buf = BytesMut::with_capacity(128).writer();
+        match simd_json::to_writer(&mut buf, &self.0) {
+            Ok(()) => (
+                [(
+                    header::CONTENT_TYPE,
+                    HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
+                )],
+                buf.into_inner().freeze(),
+            )
+                .into_response(),
+            Err(err) => (
+                StatusCode::INTERNAL_SERVER_ERROR,
+                [(
+                    header::CONTENT_TYPE,
+                    HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()),
+                )],
+                err.to_string(),
+            )
+                .into_response(),
+        }
+    }
+}

+ 3 - 21
frameworks/Rust/axum/src/utils.rs → frameworks/Rust/axum/src/common/utils.rs

@@ -1,35 +1,17 @@
-use std::{env, fmt::Debug, str::FromStr};
-
 use axum::{
-    body::{Bytes, Full},
+    body::Bytes,
     http::{header, HeaderValue, StatusCode},
     response::{IntoResponse, Response},
 };
-use rand::{rngs::SmallRng, Rng};
 use serde::Deserialize;
 
-pub fn get_environment_variable<T: FromStr>(key: &str) -> T
-where
-    <T as FromStr>::Err: Debug,
-{
-    env::var(key)
-        .unwrap_or_else(|_| panic!("{key} environment variable was not set"))
-        .parse::<T>()
-        .unwrap_or_else(|_| panic!("could not parse {key}"))
-}
-
 #[derive(Debug, Deserialize)]
 pub struct Params {
     queries: Option<String>,
 }
 
 #[allow(dead_code)]
-pub fn random_number(rng: &mut SmallRng) -> i32 {
-    (rng.gen::<u32>() % 10_000 + 1) as i32
-}
-
-#[allow(dead_code)]
-pub fn parse_params(params: Params) -> i32 {
+pub fn parse_params(params: Params) -> usize {
     params
         .queries
         .and_then(|q| q.parse().ok())
@@ -52,7 +34,7 @@ pub struct Utf8Html<T>(pub T);
 
 impl<T> IntoResponse for Utf8Html<T>
 where
-    T: Into<Full<Bytes>>,
+    T: Into<Bytes>,
 {
     fn into_response(self) -> Response {
         let mut res = (StatusCode::OK, self.0.into()).into_response();

+ 0 - 199
frameworks/Rust/axum/src/database_pg.rs

@@ -1,199 +0,0 @@
-use std::{collections::HashMap, convert::Infallible, fmt::Write, io, sync::Arc};
-
-use axum::{async_trait, extract::FromRequestParts, http::request::Parts};
-use futures::{
-    stream::futures_unordered::FuturesUnordered, FutureExt, StreamExt, TryStreamExt,
-};
-use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
-use tokio::pin;
-use tokio_postgres::{connect, types::ToSql, Client, NoTls, Statement};
-
-use crate::models_pg::{Fortune, World};
-
-#[derive(Debug)]
-pub enum PgError {
-    Io(io::Error),
-    Pg(tokio_postgres::Error),
-}
-
-impl From<io::Error> for PgError {
-    fn from(err: io::Error) -> Self {
-        PgError::Io(err)
-    }
-}
-
-impl From<tokio_postgres::Error> for PgError {
-    fn from(err: tokio_postgres::Error) -> Self {
-        PgError::Pg(err)
-    }
-}
-
-/// Postgres interface
-pub struct PgConnection {
-    client: Client,
-    fortune: Statement,
-    world: Statement,
-    updates: HashMap<u16, Statement>,
-}
-
-impl PgConnection {
-    pub async fn connect(db_url: String) -> Arc<PgConnection> {
-        let (cl, conn) = connect(&db_url, NoTls)
-            .await
-            .expect("can not connect to postgresql");
-
-        // Spawn connection
-        tokio::spawn(async move {
-            if let Err(error) = conn.await {
-                eprintln!("Connection error: {error}");
-            }
-        });
-
-        let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
-        let mut updates = HashMap::new();
-
-        for num in 1..=500u16 {
-            let mut pl = 1;
-            let mut q = String::new();
-
-            q.push_str("UPDATE world SET randomnumber = CASE id ");
-
-            for _ in 1..=num {
-                let _ = write!(q, "when ${pl} then ${} ", pl + 1);
-                pl += 2;
-            }
-
-            q.push_str("ELSE randomnumber END WHERE id IN (");
-
-            for _ in 1..=num {
-                let _ = write!(q, "${pl},");
-                pl += 1;
-            }
-
-            q.pop();
-            q.push(')');
-
-            updates.insert(num, cl.prepare(&q).await.unwrap());
-        }
-
-        let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
-
-        Arc::new(PgConnection {
-            client: cl,
-            fortune,
-            world,
-            updates,
-        })
-    }
-}
-
-impl PgConnection {
-    async fn query_one_world(&self, id: i32) -> Result<World, PgError> {
-        let stream = self.client.query_raw(&self.world, &[&id]).await?;
-        pin!(stream);
-        let row = stream.next().await.unwrap()?;
-        Ok(World {
-            id: row.get(0),
-            randomnumber: row.get(1),
-        })
-    }
-
-    pub async fn get_world(&self) -> Result<World, PgError> {
-        let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-
-        let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        let world = self.query_one_world(random_id).await?;
-        Ok(world)
-    }
-
-    pub async fn get_worlds(&self, num: usize) -> Result<Vec<World>, PgError> {
-        let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-
-        let worlds = FuturesUnordered::new();
-
-        for _ in 0..num {
-            let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-            worlds.push(self.query_one_world(w_id));
-        }
-
-        worlds.try_collect().await
-    }
-
-    pub async fn update(&self, num: u16) -> Result<Vec<World>, PgError> {
-        let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-
-        let worlds = FuturesUnordered::new();
-
-        for _ in 0..num {
-            let id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-            let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-            worlds.push(self.query_one_world(w_id).map(move |res| match res {
-                Ok(mut world) => {
-                    world.randomnumber = id;
-                    Ok(world)
-                }
-
-                Err(err) => Err(err),
-            }));
-        }
-
-        let st = self.updates.get(&num).unwrap().clone();
-
-        let worlds: Vec<World> = worlds.try_collect().await?;
-
-        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num as usize * 3);
-
-        for w in &worlds {
-            params.push(&w.id);
-            params.push(&w.randomnumber);
-        }
-
-        for w in &worlds {
-            params.push(&w.id);
-        }
-
-        self.client.query(&st, &params[..]).await?;
-
-        Ok(worlds)
-    }
-
-    pub async fn tell_fortune(&self) -> Result<Vec<Fortune>, PgError> {
-        let mut items = vec![Fortune {
-            id: 0,
-            message: "Additional fortune added at request time.".parse().unwrap(),
-        }];
-
-        let fut = self.client.query_raw::<_, _, &[i32; 0]>(&self.fortune, &[]);
-
-        let stream = fut.await?;
-        pin!(stream);
-
-        while let Some(row) = stream.next().await {
-            let row = row?;
-
-            items.push(Fortune {
-                id: row.get(0),
-                message: row.get(1),
-            });
-        }
-
-        items.sort_by(|it, next| it.message.cmp(&next.message));
-        Ok(items)
-    }
-}
-
-pub struct DatabaseConnection(pub Arc<PgConnection>);
-
-#[async_trait]
-impl FromRequestParts<Arc<PgConnection>> for DatabaseConnection {
-    type Rejection = Infallible;
-
-    async fn from_request_parts(
-        _parts: &mut Parts,
-        pg_connection: &Arc<PgConnection>,
-    ) -> Result<Self, Self::Rejection> {
-        Ok(Self(pg_connection.clone()))
-    }
-}

+ 0 - 86
frameworks/Rust/axum/src/database_sqlx.rs

@@ -1,86 +0,0 @@
-use std::io;
-
-use axum::{
-    async_trait,
-    extract::FromRequestParts,
-    http::{request::Parts, StatusCode},
-};
-use sqlx::{
-    pool::PoolConnection,
-    postgres::{PgArguments, PgPoolOptions},
-    Arguments, PgPool, Postgres,
-};
-
-use crate::{utils::internal_error, Fortune, World};
-
-#[derive(Debug)]
-pub enum PgError {
-    Io(io::Error),
-    Pg(sqlx::Error),
-}
-
-impl From<io::Error> for PgError {
-    fn from(err: io::Error) -> Self {
-        PgError::Io(err)
-    }
-}
-
-impl From<sqlx::Error> for PgError {
-    fn from(err: sqlx::Error) -> Self {
-        PgError::Pg(err)
-    }
-}
-
-pub async fn create_pool(
-    database_url: String,
-    max_pool_size: u32,
-    min_pool_size: u32,
-) -> PgPool {
-    PgPoolOptions::new()
-        .max_connections(max_pool_size)
-        .min_connections(min_pool_size)
-        .connect(&database_url)
-        .await
-        .unwrap()
-}
-
-pub struct DatabaseConnection(pub PoolConnection<Postgres>);
-
-#[async_trait]
-impl FromRequestParts<PgPool> for DatabaseConnection {
-    type Rejection = (StatusCode, String);
-
-    async fn from_request_parts(
-        _parts: &mut Parts,
-        pool: &PgPool,
-    ) -> Result<Self, Self::Rejection> {
-        let conn = pool.acquire().await.map_err(internal_error)?;
-
-        Ok(Self(conn))
-    }
-}
-
-pub async fn fetch_world(
-    mut conn: PoolConnection<Postgres>,
-    number: i32,
-) -> Result<World, PgError> {
-    let mut args = PgArguments::default();
-    args.add(number);
-
-    let world: World =
-        sqlx::query_as_with("SELECT id, randomnumber FROM World WHERE id = $1", args)
-            .fetch_one(&mut *conn)
-            .await
-            .expect("error loading world");
-    Ok(world)
-}
-
-pub async fn fetch_fortunes(
-    mut conn: PoolConnection<Postgres>,
-) -> Result<Vec<Fortune>, PgError> {
-    let fortunes: Vec<Fortune> = sqlx::query_as("SELECT * FROM Fortune")
-        .fetch_all(&mut *conn)
-        .await
-        .expect("error loading Fortunes");
-    Ok(fortunes)
-}

+ 16 - 26
frameworks/Rust/axum/src/main.rs

@@ -1,21 +1,21 @@
-use axum::{
-    http::{header, HeaderValue, StatusCode},
-    response::IntoResponse,
-    routing::get,
-    Json, Router,
-};
-use dotenv::dotenv;
-use tower_http::set_header::SetResponseHeaderLayer;
-
-mod models_common;
+mod common;
 mod server;
 
-use self::models_common::Message;
+use axum::{http::StatusCode, response::IntoResponse, routing::get, Router};
+use common::models::Message;
+use dotenv::dotenv;
+
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
 
-pub async fn plaintext() -> &'static str {
-    "Hello, World!"
+/// Return a plaintext static string.
+pub async fn plaintext() -> impl IntoResponse {
+    (StatusCode::OK, "Hello, World!")
 }
 
+/// Return a JSON message.
 pub async fn json() -> impl IntoResponse {
     let message = Message {
         message: "Hello, World!",
@@ -28,19 +28,9 @@ pub async fn json() -> impl IntoResponse {
 async fn main() {
     dotenv().ok();
 
-    let server_header_value = HeaderValue::from_static("Axum");
-
     let app = Router::new()
         .route("/plaintext", get(plaintext))
-        .route("/json", get(json))
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ));
-
-    server::builder()
-        .http1_pipeline_flush(true)
-        .serve(app.into_make_service())
-        .await
-        .unwrap();
+        .route("/json", get(json));
+
+    server::serve_hyper(app, Some(8000)).await
 }

+ 30 - 46
frameworks/Rust/axum/src/main_mongo.rs

@@ -1,11 +1,20 @@
+mod common;
+mod mongo;
+//mod mongo_raw;
+
 use std::time::Duration;
 
 use axum::{
-    extract::Query,
-    http::{header, HeaderValue, StatusCode},
-    response::IntoResponse,
-    routing::get,
-    Json, Router,
+    extract::Query, http::StatusCode, response::IntoResponse, routing::get, Router,
+};
+
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
+use common::{
+    models::{FortuneInfo, World},
+    random_ids,
 };
 use dotenv::dotenv;
 use mongodb::{
@@ -13,21 +22,16 @@ use mongodb::{
     Client,
 };
 use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
-use tower_http::set_header::SetResponseHeaderLayer;
 use yarte::Template;
 
-mod database_mongo;
-mod models_common;
-mod models_mongo;
 mod server;
-mod utils;
-
-use self::{
-    database_mongo::{
-        fetch_fortunes, find_world_by_id, find_worlds, update_worlds, DatabaseConnection,
-    },
-    models_mongo::{Fortune, FortuneInfo, World},
-    utils::{get_environment_variable, parse_params, Params, Utf8Html},
+
+use common::{
+    get_env,
+    utils::{parse_params, Params, Utf8Html},
+};
+use mongo::database::{
+    fetch_fortunes, find_world_by_id, find_worlds, update_worlds, DatabaseConnection,
 };
 
 #[derive(Template)]
@@ -55,13 +59,7 @@ async fn queries(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-    let mut ids: Vec<i32> = Vec::with_capacity(q as usize);
-
-    for _ in 0..q {
-        let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        ids.push(random_id);
-    }
+    let ids = random_ids(&mut rng, q);
 
     let worlds = find_worlds(db, ids).await;
     let results = worlds.expect("worlds could not be retrieved");
@@ -76,18 +74,12 @@ async fn updates(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-    let mut ids: Vec<i32> = Vec::with_capacity(q as usize);
-
-    for _ in 0..q {
-        let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        ids.push(random_id);
-    }
+    let ids = random_ids(&mut rng, q);
 
     let worlds = find_worlds(db.clone(), ids)
         .await
         .expect("worlds could not be retrieved");
-    let mut updated_worlds: Vec<World> = Vec::with_capacity(q as usize);
+    let mut updated_worlds: Vec<World> = Vec::with_capacity(q);
 
     for mut world in worlds {
         let random_number = (rng.gen::<u32>() % 10_000 + 1) as i32;
@@ -144,9 +136,9 @@ fn main() {
 }
 
 async fn serve() {
-    let database_url: String = get_environment_variable("AXUM_TECHEMPOWER_MONGODB_URL");
-    let max_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MAX_POOL_SIZE");
-    let min_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MIN_POOL_SIZE");
+    let database_url: String = get_env("MONGODB_URL");
+    let max_pool_size: u32 = get_env("MONGODB_MAX_POOL_SIZE");
+    let min_pool_size: u32 = get_env("MONGODB_MIN_POOL_SIZE");
 
     let mut client_options = ClientOptions::parse(database_url).await.unwrap();
 
@@ -168,21 +160,13 @@ async fn serve() {
 
     let client = Client::with_options(client_options).unwrap();
     let database = client.database("hello_world");
-    let server_header_value = HeaderValue::from_static("Axum");
 
     let app = Router::new()
         .route("/fortunes", get(fortunes))
         .route("/db", get(db))
         .route("/queries", get(queries))
         .route("/updates", get(updates))
-        .with_state(database)
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ));
-
-    server::builder()
-        .serve(app.into_make_service())
-        .await
-        .unwrap();
+        .with_state(database);
+
+    server::serve(app, Some(8000)).await
 }

+ 30 - 50
frameworks/Rust/axum/src/main_mongo_raw.rs

@@ -1,38 +1,38 @@
+mod common;
+mod mongo_raw;
+mod server;
+
+use common::{models::World, random_id, random_ids};
+use mongo_raw::database::{
+    find_world_by_id, find_worlds, update_worlds, DatabaseConnection,
+};
+
+use common::{
+    get_env,
+    utils::{parse_params, Params},
+};
 use std::time::Duration;
 
 use axum::{
-    extract::Query,
-    http::{header, HeaderValue, StatusCode},
-    response::IntoResponse,
-    routing::get,
-    Json, Router,
+    extract::Query, http::StatusCode, response::IntoResponse, routing::get, Router,
 };
+
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
+
 use dotenv::dotenv;
 use mongodb::{
     options::{ClientOptions, Compressor},
     Client,
 };
 use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
-use tower_http::set_header::SetResponseHeaderLayer;
-
-mod database_mongo_raw;
-mod models_common;
-mod models_mongo;
-mod server;
-mod utils;
-
-use self::{
-    database_mongo_raw::{
-        find_world_by_id, find_worlds, update_worlds, DatabaseConnection,
-    },
-    models_mongo::World,
-    utils::{get_environment_variable, parse_params, Params},
-};
 
 async fn db(DatabaseConnection(db): DatabaseConnection) -> impl IntoResponse {
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
 
-    let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
+    let random_id = random_id(&mut rng);
 
     let world = find_world_by_id(db, random_id)
         .await
@@ -48,13 +48,7 @@ async fn queries(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-    let mut ids: Vec<i32> = Vec::with_capacity(q as usize);
-
-    for _ in 0..q {
-        let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        ids.push(random_id);
-    }
+    let ids = random_ids(&mut rng, q);
 
     let worlds = find_worlds(db, ids).await;
     let results = worlds.expect("worlds could not be retrieved");
@@ -69,18 +63,12 @@ async fn updates(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-    let mut ids: Vec<i32> = Vec::with_capacity(q as usize);
-
-    for _ in 0..q {
-        let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        ids.push(random_id);
-    }
 
+    let ids = random_ids(&mut rng, q);
     let worlds = find_worlds(db.clone(), ids)
         .await
         .expect("worlds could not be retrieved");
-    let mut updated_worlds: Vec<World> = Vec::with_capacity(q as usize);
+    let mut updated_worlds: Vec<World> = Vec::with_capacity(q);
 
     for mut world in worlds {
         let random_number = (rng.gen::<u32>() % 10_000 + 1) as i32;
@@ -117,9 +105,9 @@ fn main() {
 }
 
 async fn serve() {
-    let database_url: String = get_environment_variable("AXUM_TECHEMPOWER_MONGODB_URL");
-    let max_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MAX_POOL_SIZE");
-    let min_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MIN_POOL_SIZE");
+    let database_url: String = get_env("MONGODB_URL");
+    let max_pool_size: u32 = get_env("MONGODB_MAX_POOL_SIZE");
+    let min_pool_size: u32 = get_env("MONGODB_MIN_POOL_SIZE");
 
     let mut client_options = ClientOptions::parse(database_url).await.unwrap();
 
@@ -141,20 +129,12 @@ async fn serve() {
 
     let client = Client::with_options(client_options).unwrap();
     let database = client.database("hello_world");
-    let server_header_value = HeaderValue::from_static("Axum");
 
     let app = Router::new()
         .route("/db", get(db))
         .route("/queries", get(queries))
         .route("/updates", get(updates))
-        .with_state(database)
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ));
-
-    server::builder()
-        .serve(app.into_make_service())
-        .await
-        .unwrap();
+        .with_state(database);
+
+    server::serve(app, Some(8000)).await
 }

+ 36 - 54
frameworks/Rust/axum/src/main_pg.rs

@@ -1,25 +1,26 @@
+mod common;
+mod pg;
+
 use axum::{
-    extract::Query,
-    http::{header, HeaderValue, StatusCode},
-    response::IntoResponse,
-    routing::get,
-    Json, Router,
+    extract::Query, http::StatusCode, response::IntoResponse, routing::get, Router,
 };
 use dotenv::dotenv;
-use tower_http::set_header::SetResponseHeaderLayer;
+use rand::{rngs::SmallRng, thread_rng, SeedableRng};
 use yarte::Template;
 
-mod database_pg;
-mod models_common;
-mod models_pg;
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
+
 mod server;
-mod utils;
 
-use self::{
-    database_pg::{DatabaseConnection, PgConnection},
-    models_pg::Fortune,
-    utils::{get_environment_variable, parse_params, Params, Utf8Html},
+use common::{
+    get_env, random_id,
+    utils::{parse_params, Params, Utf8Html},
 };
+use pg::database::{DatabaseConnection, PgConnection};
+use pg::models::Fortune;
 
 #[derive(Template)]
 #[template(path = "fortunes.html.hbs")]
@@ -28,7 +29,12 @@ pub struct FortunesTemplate<'a> {
 }
 
 async fn db(DatabaseConnection(conn): DatabaseConnection) -> impl IntoResponse {
-    let world = conn.get_world().await.expect("error loading world");
+    let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+
+    let world = conn
+        .fetch_world_by_id(random_id(&mut rng))
+        .await
+        .expect("error loading world");
 
     (StatusCode::OK, Json(world))
 }
@@ -40,7 +46,7 @@ async fn queries(
     let q = parse_params(params);
 
     let results = conn
-        .get_worlds(q as usize)
+        .fetch_random_worlds(q)
         .await
         .expect("error loading worlds");
 
@@ -48,8 +54,10 @@ async fn queries(
 }
 
 async fn fortunes(DatabaseConnection(conn): DatabaseConnection) -> impl IntoResponse {
-    let fortunes: Vec<Fortune> =
-        conn.tell_fortune().await.expect("error loading fortunes");
+    let fortunes: Vec<Fortune> = conn
+        .fetch_all_fortunes()
+        .await
+        .expect("error loading fortunes");
 
     Utf8Html(
         FortunesTemplate {
@@ -65,52 +73,26 @@ async fn updates(
     Query(params): Query<Params>,
 ) -> impl IntoResponse {
     let q = parse_params(params);
+    let worlds = conn.update_worlds(q).await.expect("error updating worlds");
 
-    let results = conn.update(q as u16).await.expect("error updating worlds");
-
-    (StatusCode::OK, Json(results))
+    (StatusCode::OK, Json(worlds))
 }
 
-fn main() {
+#[tokio::main]
+async fn main() {
     dotenv().ok();
 
-    let rt = tokio::runtime::Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap();
-
-    for _ in 1..num_cpus::get() {
-        std::thread::spawn(move || {
-            let rt = tokio::runtime::Builder::new_current_thread()
-                .enable_all()
-                .build()
-                .unwrap();
-            rt.block_on(serve());
-        });
-    }
-    rt.block_on(serve());
-}
-
-async fn serve() {
-    let database_url: String = get_environment_variable("AXUM_TECHEMPOWER_DATABASE_URL");
+    let database_url: String = get_env("POSTGRES_URL");
 
-    // setup connection pool
+    // Create shared database connection
     let pg_connection = PgConnection::connect(database_url).await;
-    let server_header_value = HeaderValue::from_static("Axum");
 
-    let router = Router::new()
+    let app = Router::new()
         .route("/fortunes", get(fortunes))
         .route("/db", get(db))
         .route("/queries", get(queries))
         .route("/updates", get(updates))
-        .with_state(pg_connection)
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ));
-
-    server::builder()
-        .serve(router.into_make_service())
-        .await
-        .unwrap();
+        .with_state(pg_connection);
+
+    server::serve_hyper(app, Some(8000)).await
 }

+ 57 - 79
frameworks/Rust/axum/src/main_pg_pool.rs

@@ -1,31 +1,31 @@
+mod common;
+mod pg_pool;
+
 use axum::{
-    extract::Query,
-    http::{header, HeaderValue, StatusCode},
-    response::IntoResponse,
-    routing::get,
-    Json, Router,
+    extract::Query, http::StatusCode, response::IntoResponse, routing::get, Router,
 };
+
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
+
+use common::{random_ids, SELECT_ALL_FORTUNES, SELECT_WORLD_BY_ID, UPDATE_WORLDS};
 use dotenv::dotenv;
 use futures_util::{stream::FuturesUnordered, TryStreamExt};
-use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
-use tower_http::set_header::SetResponseHeaderLayer;
+use rand::{rngs::SmallRng, thread_rng, SeedableRng};
 use yarte::Template;
 
-mod database_pg_pool;
-mod models_common;
-mod models_pg_pool;
 mod server;
-mod utils;
-
-use self::{
-    database_pg_pool::{
-        create_pool, fetch_all_fortunes, fetch_world_by_id,
-        prepare_fetch_all_fortunes_statement, prepare_fetch_world_by_id_statement,
-        prepare_update_world_by_id_statement, update_world, DatabaseClient, PgError,
-    },
-    models_pg_pool::{Fortune, World},
-    utils::{get_environment_variable, parse_params, random_number, Params, Utf8Html},
+
+use common::{
+    get_env, random_id,
+    utils::{parse_params, Params, Utf8Html},
 };
+use pg_pool::database::{
+    create_pool, fetch_all_fortunes, fetch_world_by_id, DatabaseClient, PgError,
+};
+use pg_pool::models::{Fortune, World};
 
 #[derive(Template)]
 #[template(path = "fortunes.html.hbs")]
@@ -35,11 +35,10 @@ pub struct FortunesTemplate<'a> {
 
 async fn db(DatabaseClient(client): DatabaseClient) -> impl IntoResponse {
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+    let random_id = random_id(&mut rng);
 
-    let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-    let select = prepare_fetch_world_by_id_statement(&client).await;
-    let world = fetch_world_by_id(&client, random_id, &select)
+    let select = &client.prepare_cached(SELECT_WORLD_BY_ID).await.unwrap();
+    let world = fetch_world_by_id(&client, random_id, select)
         .await
         .expect("could not fetch world");
 
@@ -53,15 +52,11 @@ async fn queries(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
-
-    let select = prepare_fetch_world_by_id_statement(&client).await;
-
+    let select = &client.prepare_cached(SELECT_WORLD_BY_ID).await.unwrap();
     let future_worlds = FuturesUnordered::new();
 
-    for _ in 0..q {
-        let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-        future_worlds.push(fetch_world_by_id(&client, w_id, &select));
+    for id in random_ids(&mut rng, q) {
+        future_worlds.push(fetch_world_by_id(&client, id, select));
     }
 
     let worlds: Result<Vec<World>, PgError> = future_worlds.try_collect().await;
@@ -71,9 +66,9 @@ async fn queries(
 }
 
 async fn fortunes(DatabaseClient(client): DatabaseClient) -> impl IntoResponse {
-    let select = prepare_fetch_all_fortunes_statement(&client).await;
+    let select = &client.prepare_cached(SELECT_ALL_FORTUNES).await.unwrap();
 
-    let mut fortunes = fetch_all_fortunes(client, &select)
+    let mut fortunes = fetch_all_fortunes(client, select)
         .await
         .expect("could not fetch fortunes");
 
@@ -100,66 +95,49 @@ async fn updates(
     let q = parse_params(params);
 
     let mut rng = SmallRng::from_entropy();
+    let select = &client.prepare_cached(SELECT_WORLD_BY_ID).await.unwrap();
+    let update = &client.prepare_cached(UPDATE_WORLDS).await.unwrap();
 
-    let select = prepare_fetch_world_by_id_statement(&client).await;
-
+    // Select the random worlds.
     let future_worlds = FuturesUnordered::new();
-
-    for _ in 0..q {
-        let query_id = random_number(&mut rng);
-
-        future_worlds.push(fetch_world_by_id(&client, query_id, &select));
+    for id in random_ids(&mut rng, q) {
+        future_worlds.push(fetch_world_by_id(&client, id, select));
     }
-
-    let worlds: Result<Vec<World>, PgError> = future_worlds.try_collect().await;
-    let results = worlds.expect("worlds could not be retrieved");
-
-    let update = prepare_update_world_by_id_statement(&client).await;
-
-    let future_world_updates = FuturesUnordered::new();
-
-    for w in &results {
-        let random_id = random_number(&mut rng);
-        let w_id = w.id;
-
-        future_world_updates.push(update_world(&client, &update, random_id, w_id));
-    }
-
-    let world_updates: Result<Vec<u64>, PgError> =
-        future_world_updates.try_collect().await;
-    world_updates.expect("updates could not be executed");
-
-    (StatusCode::OK, Json(results))
+    let worlds: Vec<World> = future_worlds.try_collect().await.unwrap();
+
+    let mut ids = Vec::with_capacity(q);
+    let mut nids = Vec::with_capacity(q);
+    let worlds: Vec<World> = worlds
+        .into_iter()
+        .map(|mut w| {
+            w.randomnumber = random_id(&mut rng);
+            ids.push(w.id);
+            nids.push(w.randomnumber);
+            w
+        })
+        .collect();
+
+    // Update the random worlds in the database.
+    client.execute(update, &[&ids, &nids]).await.unwrap();
+
+    (StatusCode::OK, Json(worlds))
 }
 
 #[tokio::main]
 async fn main() {
     dotenv().ok();
 
-    serve().await;
-}
-
-async fn serve() {
-    let database_url: String = get_environment_variable("AXUM_TECHEMPOWER_DATABASE_URL");
-    let max_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MAX_POOL_SIZE");
+    let database_url: String = get_env("POSTGRES_URL");
+    let max_pool_size: u32 = get_env("POSTGRES_MAX_POOL_SIZE");
 
-    // setup Client pool
     let pool = create_pool(database_url, max_pool_size).await;
-    let server_header_value = HeaderValue::from_static("Axum");
 
-    let router = Router::new()
+    let app = Router::new()
         .route("/fortunes", get(fortunes))
         .route("/db", get(db))
         .route("/queries", get(queries))
         .route("/updates", get(updates))
-        .with_state(pool)
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ));
-
-    server::builder()
-        .serve(router.into_make_service())
-        .await
-        .unwrap();
+        .with_state(pool);
+
+    server::serve_hyper(app, Some(8000)).await
 }

+ 100 - 43
frameworks/Rust/axum/src/main_sqlx.rs

@@ -1,27 +1,35 @@
+mod common;
+mod sqlx;
+
+use std::sync::Arc;
+
+use ::sqlx::PgPool;
 use axum::{
-    http::{header, HeaderValue, StatusCode},
+    extract::{Query, State},
+    http::StatusCode,
     response::IntoResponse,
     routing::get,
-    Json, Router,
+    Router,
 };
 use dotenv::dotenv;
-use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
-use sqlx::PgPool;
-use tower_http::set_header::SetResponseHeaderLayer;
+use moka::future::Cache;
+use rand::{rngs::SmallRng, thread_rng, SeedableRng};
+use sqlx::models::World;
 use yarte::Template;
 
-mod database_sqlx;
-mod models_common;
-mod models_sqlx;
+#[cfg(not(feature = "simd-json"))]
+use axum::Json;
+#[cfg(feature = "simd-json")]
+use common::simd_json::Json;
+
 mod server;
-mod utils;
 
-use self::{
-    database_sqlx::{create_pool, fetch_fortunes, fetch_world, DatabaseConnection},
-    models_sqlx::{Fortune, World},
-    utils::get_environment_variable,
-    utils::Utf8Html,
+use common::{
+    get_env, random_id, random_ids,
+    utils::{parse_params, Params, Utf8Html},
 };
+use sqlx::database::create_pool;
+use sqlx::models::Fortune;
 
 #[derive(Template)]
 #[template(path = "fortunes.html.hbs")]
@@ -29,22 +37,44 @@ pub struct FortunesTemplate<'a> {
     pub fortunes: &'a Vec<Fortune>,
 }
 
-async fn db(DatabaseConnection(conn): DatabaseConnection) -> impl IntoResponse {
+async fn db(State(AppState { db, .. }): State<AppState>) -> impl IntoResponse {
     let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
 
-    let random_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
-
-    let world = fetch_world(conn, random_id)
+    let world: World = ::sqlx::query_as(common::SELECT_WORLD_BY_ID)
+        .bind(random_id(&mut rng))
+        .fetch_one(&mut *db.acquire().await.unwrap())
         .await
-        .expect("could not fetch world");
+        .expect("error loading world");
 
     (StatusCode::OK, Json(world))
 }
 
-async fn fortunes(DatabaseConnection(conn): DatabaseConnection) -> impl IntoResponse {
-    let mut fortunes = fetch_fortunes(conn)
+async fn queries(
+    State(AppState { db, .. }): State<AppState>,
+    Query(params): Query<Params>,
+) -> impl IntoResponse {
+    let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+    let count = parse_params(params);
+    let ids = random_ids(&mut rng, count);
+    let mut worlds: Vec<World> = Vec::with_capacity(count);
+
+    for id in ids {
+        let world: World = ::sqlx::query_as(common::SELECT_WORLD_BY_ID)
+            .bind(id)
+            .fetch_one(&mut *db.acquire().await.unwrap())
+            .await
+            .expect("error loading world");
+        worlds.push(world);
+    }
+
+    (StatusCode::OK, Json(worlds))
+}
+
+async fn fortunes(State(AppState { db, .. }): State<AppState>) -> impl IntoResponse {
+    let mut fortunes: Vec<Fortune> = ::sqlx::query_as(common::SELECT_ALL_FORTUNES)
+        .fetch_all(&mut *db.acquire().await.unwrap())
         .await
-        .expect("could not fetch fortunes");
+        .expect("error loading Fortunes");
 
     fortunes.push(Fortune {
         id: 0,
@@ -62,34 +92,61 @@ async fn fortunes(DatabaseConnection(conn): DatabaseConnection) -> impl IntoResp
     )
 }
 
-#[tokio::main]
-async fn main() {
-    dotenv().ok();
-
-    let database_url: String = get_environment_variable("AXUM_TECHEMPOWER_DATABASE_URL");
-    let max_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MAX_POOL_SIZE");
-    let min_pool_size: u32 = get_environment_variable("AXUM_TECHEMPOWER_MIN_POOL_SIZE");
+async fn cache(
+    State(AppState { cache, .. }): State<AppState>,
+    Query(params): Query<Params>,
+) -> impl IntoResponse {
+    let count = parse_params(params);
+    let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+    let mut worlds: Vec<Option<Arc<World>>> = Vec::with_capacity(count);
 
-    // setup connection pool
-    let pool = create_pool(database_url, max_pool_size, min_pool_size).await;
+    for id in random_ids(&mut rng, count) {
+        worlds.push(cache.get(&id).await);
+    }
 
-    let app = router(pool).await;
+    (StatusCode::OK, Json(worlds))
+}
 
-    server::builder()
-        .serve(app.into_make_service())
+/// Pre-load the cache with all worlds.
+async fn preload_cache(AppState { db, cache }: &AppState) {
+    let worlds: Vec<World> = ::sqlx::query_as(common::SELECT_ALL_CACHED_WORLDS)
+        .fetch_all(&mut *db.acquire().await.unwrap())
         .await
-        .unwrap();
+        .expect("error loading worlds");
+
+    for world in worlds {
+        cache.insert(world.id, Arc::new(world)).await;
+    }
 }
 
-async fn router(pool: PgPool) -> Router {
-    let server_header_value = HeaderValue::from_static("Axum");
+#[derive(Clone)]
+struct AppState {
+    db: PgPool,
+    cache: Cache<i32, Arc<World>>,
+}
 
-    Router::new()
+#[tokio::main]
+async fn main() {
+    dotenv().ok();
+
+    let database_url: String = get_env("POSTGRES_URL");
+    let max_pool_size: u32 = get_env("POSTGRES_MAX_POOL_SIZE");
+    let min_pool_size: u32 = get_env("POSTGRES_MIN_POOL_SIZE");
+
+    let state = AppState {
+        db: create_pool(database_url, max_pool_size, min_pool_size).await,
+        cache: Cache::new(10000),
+    };
+
+    // Prime the cache with CachedWorld objects
+    preload_cache(&state).await;
+
+    let app = Router::new()
         .route("/fortunes", get(fortunes))
         .route("/db", get(db))
-        .with_state(pool)
-        .layer(SetResponseHeaderLayer::if_not_present(
-            header::SERVER,
-            server_header_value,
-        ))
+        .route("/queries", get(queries))
+        .route("/cached-queries", get(cache))
+        .with_state(state);
+
+    server::serve_hyper(app, Some(8000)).await
 }

+ 0 - 6
frameworks/Rust/axum/src/models_common.rs

@@ -1,6 +0,0 @@
-use serde::Serialize;
-
-#[derive(Serialize)]
-pub struct Message {
-    pub message: &'static str,
-}

+ 2 - 1
frameworks/Rust/axum/src/database_mongo.rs → frameworks/Rust/axum/src/mongo/database.rs

@@ -4,7 +4,7 @@ use axum::{async_trait, extract::FromRequestParts, http::request::Parts};
 use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt};
 use mongodb::{bson::doc, Database};
 
-use crate::{Fortune, World};
+use crate::common::models::{Fortune, World};
 
 pub struct DatabaseConnection(pub Database);
 
@@ -21,6 +21,7 @@ impl FromRequestParts<Database> for DatabaseConnection {
 }
 
 #[derive(Debug)]
+#[allow(dead_code)]
 pub enum MongoError {
     Io(io::Error),
     Mongo(mongodb::error::Error),

+ 1 - 0
frameworks/Rust/axum/src/mongo/mod.rs

@@ -0,0 +1 @@
+pub mod database;

+ 2 - 1
frameworks/Rust/axum/src/database_mongo_raw.rs → frameworks/Rust/axum/src/mongo_raw/database.rs

@@ -7,7 +7,7 @@ use mongodb::{
     Database,
 };
 
-use crate::World;
+use crate::common::models::World;
 
 pub struct DatabaseConnection(pub Database);
 
@@ -24,6 +24,7 @@ impl FromRequestParts<Database> for DatabaseConnection {
 }
 
 #[derive(Debug)]
+#[allow(dead_code)]
 pub enum MongoError {
     Io(io::Error),
     Mongo(mongodb::error::Error),

+ 1 - 0
frameworks/Rust/axum/src/mongo_raw/mod.rs

@@ -0,0 +1 @@
+pub mod database;

+ 155 - 0
frameworks/Rust/axum/src/pg/database.rs

@@ -0,0 +1,155 @@
+use std::{convert::Infallible, io, sync::Arc};
+
+use axum::{async_trait, extract::FromRequestParts, http::request::Parts};
+use futures::{stream::futures_unordered::FuturesUnordered, StreamExt, TryStreamExt};
+use rand::{rngs::SmallRng, thread_rng, SeedableRng};
+use tokio::pin;
+use tokio_postgres::{connect, Client, NoTls, Statement};
+
+use crate::common::{self, random_id, random_ids};
+
+use super::models::{Fortune, World};
+
+#[derive(Debug)]
+#[allow(dead_code)]
+pub enum PgError {
+    Io(io::Error),
+    Pg(tokio_postgres::Error),
+}
+
+impl From<io::Error> for PgError {
+    fn from(err: io::Error) -> Self {
+        PgError::Io(err)
+    }
+}
+
+impl From<tokio_postgres::Error> for PgError {
+    fn from(err: tokio_postgres::Error) -> Self {
+        PgError::Pg(err)
+    }
+}
+
+/// Postgres interface
+pub struct PgConnection {
+    client: Client,
+    fortune: Statement,
+    world: Statement,
+    updates: Statement,
+}
+
+impl PgConnection {
+    pub async fn connect(db_url: String) -> Arc<PgConnection> {
+        let (cl, conn) = connect(&db_url, NoTls)
+            .await
+            .expect("cannot connect to postgresql.");
+
+        // Spawn connection
+        tokio::spawn(async move {
+            if let Err(error) = conn.await {
+                eprintln!("database connection error: {error}");
+            }
+        });
+
+        // Prepare statements for the connection.
+        let fortune = cl.prepare(common::SELECT_ALL_FORTUNES).await.unwrap();
+        let world = cl.prepare(common::SELECT_WORLD_BY_ID).await.unwrap();
+        let updates = cl.prepare(common::UPDATE_WORLDS).await.unwrap();
+
+        Arc::new(PgConnection {
+            client: cl,
+            fortune,
+            world,
+            updates,
+        })
+    }
+}
+
+impl PgConnection {
+    pub async fn fetch_world_by_id(&self, id: i32) -> Result<World, PgError> {
+        self.client
+            .query_one(&self.world, &[&id])
+            .await
+            .map(|row| {
+                Ok(World {
+                    id: row.get(0),
+                    randomnumber: row.get(1),
+                })
+            })?
+    }
+
+    pub async fn fetch_random_worlds(&self, num: usize) -> Result<Vec<World>, PgError> {
+        let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+
+        let futures = FuturesUnordered::new();
+
+        for id in random_ids(&mut rng, num) {
+            futures.push(self.fetch_world_by_id(id));
+        }
+
+        futures.try_collect().await
+    }
+
+    pub async fn update_worlds(&self, num: usize) -> Result<Vec<World>, PgError> {
+        let worlds = self.fetch_random_worlds(num).await?;
+
+        // Update the worlds with new random numbers
+        let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
+        let mut ids = Vec::with_capacity(num);
+        let mut nids = Vec::with_capacity(num);
+        let worlds: Vec<World> = worlds
+            .into_iter()
+            .map(|mut w| {
+                w.randomnumber = random_id(&mut rng);
+                ids.push(w.id);
+                nids.push(w.randomnumber);
+                w
+            })
+            .collect();
+
+        // Update the random worlds in the database.
+        self.client
+            .execute(&self.updates, &[&ids, &nids])
+            .await
+            .unwrap();
+
+        Ok(worlds)
+    }
+
+    pub async fn fetch_all_fortunes(&self) -> Result<Vec<Fortune>, PgError> {
+        let mut fortunes = vec![Fortune {
+            id: 0,
+            message: "Additional fortune added at request time.".parse().unwrap(),
+        }];
+
+        let rows = self
+            .client
+            .query_raw::<_, _, &[i32; 0]>(&self.fortune, &[])
+            .await?;
+
+        pin!(rows);
+
+        while let Some(row) = rows.next().await.transpose()? {
+            fortunes.push(Fortune {
+                id: row.get(0),
+                message: row.get(1),
+            });
+        }
+
+        fortunes.sort_by(|it, next| it.message.cmp(&next.message));
+        Ok(fortunes)
+    }
+}
+
+pub struct DatabaseConnection(pub Arc<PgConnection>);
+
+#[async_trait]
+impl FromRequestParts<Arc<PgConnection>> for DatabaseConnection {
+    type Rejection = Infallible;
+
+    async fn from_request_parts(
+        _parts: &mut Parts,
+        pg_connection: &Arc<PgConnection>,
+    ) -> Result<Self, Self::Rejection> {
+        Ok(Self(pg_connection.clone()))
+    }
+}

+ 2 - 0
frameworks/Rust/axum/src/pg/mod.rs

@@ -0,0 +1,2 @@
+pub mod database;
+pub mod models;

+ 0 - 0
frameworks/Rust/axum/src/models_pg.rs → frameworks/Rust/axum/src/pg/models.rs


+ 15 - 41
frameworks/Rust/axum/src/database_pg_pool.rs → frameworks/Rust/axum/src/pg_pool/database.rs

@@ -1,17 +1,22 @@
 use std::io;
 
+use crate::{
+    common::utils::internal_error,
+    pg_pool::models::{Fortune, World},
+};
 use axum::{
     async_trait,
     extract::FromRequestParts,
     http::{request::Parts, StatusCode},
 };
 use deadpool_postgres::{Client, Manager, ManagerConfig, RecyclingMethod};
+use futures_util::StreamExt;
+use tokio::pin;
 use tokio_pg_mapper::FromTokioPostgresRow;
 use tokio_postgres::{NoTls, Row, Statement};
 
-use crate::{utils::internal_error, Fortune, World};
-
 #[derive(Debug)]
+#[allow(dead_code)]
 pub enum PgError {
     Io(io::Error),
     Pg(tokio_postgres::Error),
@@ -66,57 +71,26 @@ impl FromRequestParts<deadpool_postgres::Pool> for DatabaseClient {
 
 pub async fn fetch_world_by_id(
     client: &Client,
-    number: i32,
+    id: i32,
     select: &Statement,
 ) -> Result<World, PgError> {
-    let row: Row = client.query_one(select, &[&number]).await.unwrap();
+    let row: Row = client.query_one(select, &[&id]).await.unwrap();
 
     Ok(World::from_row(row).unwrap())
 }
 
-pub async fn update_world(
-    client: &Client,
-    update: &Statement,
-    random_id: i32,
-    w_id: i32,
-) -> Result<u64, PgError> {
-    let rows_modified: u64 = client.execute(update, &[&random_id, &w_id]).await.unwrap();
-
-    Ok(rows_modified)
-}
-
 pub async fn fetch_all_fortunes(
     client: Client,
     select: &Statement,
 ) -> Result<Vec<Fortune>, PgError> {
-    let rows: Vec<Row> = client.query(select, &[]).await.unwrap();
-
-    let mut fortunes: Vec<Fortune> = Vec::with_capacity(rows.capacity());
+    let mut fortunes: Vec<Fortune> = Vec::new();
+    let rows = client.query_raw::<_, _, &[i32; 0]>(select, &[]).await?;
+    pin!(rows);
 
-    for row in rows {
-        fortunes.push(Fortune::from_row(row).unwrap());
+    while let Some(row) = rows.next().await.transpose()? {
+        fortunes
+            .push(Fortune::from_row(row).expect("could not convert row to fortune."));
     }
 
     Ok(fortunes)
 }
-
-pub async fn prepare_fetch_all_fortunes_statement(client: &Client) -> Statement {
-    client
-        .prepare_cached("SELECT * FROM Fortune")
-        .await
-        .unwrap()
-}
-
-pub async fn prepare_fetch_world_by_id_statement(client: &Client) -> Statement {
-    client
-        .prepare_cached("SELECT id, randomnumber FROM World WHERE id = $1")
-        .await
-        .unwrap()
-}
-
-pub async fn prepare_update_world_by_id_statement(client: &Client) -> Statement {
-    client
-        .prepare_cached("UPDATE World SET randomnumber = $1 WHERE id = $2")
-        .await
-        .unwrap()
-}

+ 2 - 0
frameworks/Rust/axum/src/pg_pool/mod.rs

@@ -0,0 +1,2 @@
+pub mod database;
+pub mod models;

+ 0 - 0
frameworks/Rust/axum/src/models_pg_pool.rs → frameworks/Rust/axum/src/pg_pool/models.rs


+ 94 - 25
frameworks/Rust/axum/src/server.rs

@@ -1,37 +1,106 @@
 use std::{
     io,
-    net::{Ipv4Addr, SocketAddr},
+    net::{Ipv4Addr, SocketAddr, TcpListener},
 };
 
-use hyper::server::conn::AddrIncoming;
-use tokio::net::{TcpListener, TcpSocket};
-
-pub fn builder() -> hyper::server::Builder<AddrIncoming> {
-    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8000));
-    let listener = reuse_listener(addr).expect("couldn't bind to addr");
-    let incoming = AddrIncoming::from_listener(listener).unwrap();
+use axum::{
+    http::{header, HeaderValue},
+    Router,
+};
 
-    println!("Started axum server at 8000");
+use hyper::body::Incoming;
+use hyper::Request;
+use hyper_util::rt::{TokioExecutor, TokioIo};
+use tower::Service;
+use tower_http::set_header::SetResponseHeaderLayer;
 
-    axum::Server::builder(incoming)
-        .http1_only(true)
-        .tcp_nodelay(true)
-}
+use socket2::{Domain, Socket, Type};
 
-fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
+/// Reuse an existing listener, ensuring that the socket `backlog``
+/// is set to enable a higher number of pending connections.
+fn set_socket_options(addr: SocketAddr) -> io::Result<tokio::net::TcpListener> {
     let socket = match addr {
-        SocketAddr::V4(_) => TcpSocket::new_v4()?,
-        SocketAddr::V6(_) => TcpSocket::new_v6()?,
+        SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
+        SocketAddr::V6(_) => Socket::new(Domain::IPV6, Type::STREAM, None)?,
     };
 
-    #[cfg(unix)]
-    {
-        if let Err(e) = socket.set_reuseport(true) {
-            eprintln!("error setting SO_REUSEPORT: {e}");
-        }
-    }
+    socket.set_reuse_port(true)?;
+    socket.set_reuse_address(true)?;
+    socket.set_nonblocking(true)?;
+    socket.set_nodelay(true)?;
+    socket.bind(&addr.into())?;
+    socket.listen(4096)?;
+
+    let listener: TcpListener = socket.into();
+    tokio::net::TcpListener::from_std(listener)
+}
+
+/// Build an Axum server with consistent configuration, using the high-level API exposed
+/// by Axum 0.7. This is intended for convenience and intentionally does not provide much
+/// customisability.
+#[allow(dead_code)]
+pub async fn serve(app: Router<()>, port: Option<u16>) {
+    let port = port.unwrap_or(8000);
+    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
+    let listener = set_socket_options(addr).expect("couldn't bind to address");
+    println!("started axum server on port {port}.");
+
+    let server_header_value = HeaderValue::from_static("Axum");
+    let app = app.layer(SetResponseHeaderLayer::overriding(
+        header::SERVER,
+        server_header_value,
+    ));
 
-    socket.set_reuseaddr(true)?;
-    socket.bind(addr)?;
-    socket.listen(1024)
+    axum::serve(listener, app.into_make_service())
+        .await
+        .unwrap();
+}
+
+/// Build an Axum server using the lower-level Hyper APIs for more
+/// configurability. This has a few optimisations, including:
+/// * Serving HTTP/1 only.
+/// * Disabling connection upgrades (websockets are not needed).
+/// * Setting TCP_NODELAY on the input stream.
+/// * Aggregating flushes to better support pipelined responses.
+///
+/// See for more details:
+/// * https://github.com/tokio-rs/axum/blob/1ac617a1b540e8523347f5ee889d65cad9a45ec4/examples/serve-with-hyper/src/main.rs
+#[allow(dead_code)]
+pub async fn serve_hyper(app: Router<()>, port: Option<u16>) {
+    let port = port.unwrap_or(8000);
+    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
+    let listener = set_socket_options(addr).expect("couldn't bind to address");
+    println!("started axum server on port {port}.");
+
+    let server_header_value = HeaderValue::from_static("Axum");
+    let app = app.layer(SetResponseHeaderLayer::overriding(
+        header::SERVER,
+        server_header_value,
+    ));
+
+    // Continuously accept new connections.
+    loop {
+        let (socket, _remote_addr) = listener.accept().await.unwrap();
+        socket
+            .set_nodelay(true)
+            .expect("could not set TCP_NODELAY!");
+
+        let tower_service = app.clone();
+        tokio::spawn(async move {
+            let socket = TokioIo::new(socket);
+
+            let hyper_service =
+                hyper::service::service_fn(move |request: Request<Incoming>| {
+                    tower_service.clone().call(request)
+                });
+
+            if (hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
+                .http1()
+                .pipeline_flush(true)
+                .serve_connection(socket, hyper_service)
+                .await)
+                .is_err()
+            {}
+        });
+    }
 }

+ 36 - 0
frameworks/Rust/axum/src/sqlx/database.rs

@@ -0,0 +1,36 @@
+use std::io;
+
+use sqlx::{postgres::PgPoolOptions, PgPool};
+
+#[derive(Debug)]
+#[allow(dead_code)]
+pub enum PgError {
+    Io(io::Error),
+    Pg(sqlx::Error),
+}
+
+impl From<io::Error> for PgError {
+    fn from(err: io::Error) -> Self {
+        PgError::Io(err)
+    }
+}
+
+impl From<sqlx::Error> for PgError {
+    fn from(err: sqlx::Error) -> Self {
+        PgError::Pg(err)
+    }
+}
+
+pub async fn create_pool(
+    database_url: String,
+    max_pool_size: u32,
+    min_pool_size: u32,
+) -> PgPool {
+    PgPoolOptions::new()
+        .max_connections(max_pool_size)
+        .min_connections(min_pool_size)
+        .test_before_acquire(false)
+        .connect(&database_url)
+        .await
+        .unwrap()
+}

+ 2 - 0
frameworks/Rust/axum/src/sqlx/mod.rs

@@ -0,0 +1,2 @@
+pub mod database;
+pub mod models;

+ 0 - 0
frameworks/Rust/axum/src/models_sqlx.rs → frameworks/Rust/axum/src/sqlx/models.rs


Some files were not shown because too many files changed in this diff